feat:support traffic mirroring.

pull/1647/head
Haotian Zhang 2 months ago
parent 99d3f56242
commit 2702c49aab

@ -22,8 +22,9 @@ import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.plugin.trafficmirroring.config.TrafficMirroringProperties;
@ -35,6 +36,7 @@ import com.tencent.polaris.api.plugin.route.RouterConstants;
import com.tencent.polaris.api.utils.ClassUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.Node;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.metadata.core.MetadataObjectValue;
import com.tencent.polaris.metadata.core.MetadataType;
import feign.Request;
@ -57,18 +59,12 @@ public class TrafficMirroringPostPlugin implements EnhancedPlugin {
private final TrafficMirroringProperties trafficMirroringProperties;
private final ExecutorService executorService;
private ThreadPoolExecutor threadPoolExecutor;
private final HttpClient httpClient;
private HttpClient httpClient;
public TrafficMirroringPostPlugin(TrafficMirroringProperties trafficMirroringProperties) {
this.trafficMirroringProperties = trafficMirroringProperties;
this.executorService = Executors.newFixedThreadPool(trafficMirroringProperties.getRequestPoolSize());
// create JDK HttpClient instance.
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(trafficMirroringProperties.getRequestConnectionTimeout()))
.executor(executorService)
.build();
}
@Override
@ -116,17 +112,38 @@ public class TrafficMirroringPostPlugin implements EnhancedPlugin {
return;
}
initIfNeeded();
byte[] body = context.getOriginBody();
// async send mirroring request.
String finalDestination = destination;
executorService.submit(() -> {
try {
sendMirroringRequest(originalRequest, body, finalDestination);
}
catch (Exception e) {
LOG.warn("Traffic mirroring request failed.", e);
// send mirroring request.
try {
sendMirroringRequest(originalRequest, body, destination);
}
catch (Exception e) {
LOG.warn("Traffic mirroring request failed.", e);
}
}
private void initIfNeeded() {
if (threadPoolExecutor == null) {
// create thread pool executor.
int cpuCores = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCores;
int maxPoolSize = trafficMirroringProperties.getRequestPoolSize();
if (cpuCores > trafficMirroringProperties.getRequestPoolSize()) {
corePoolSize = trafficMirroringProperties.getRequestPoolSize();
}
});
this.threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("polaris-traffic-mirroring"));
}
if (httpClient == null) {
// create JDK HttpClient instance.
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(trafficMirroringProperties.getRequestConnectionTimeout()))
.executor(threadPoolExecutor)
.build();
}
}
private void setHeaders(java.net.http.HttpRequest.Builder requestBuilder, Object originalRequest) {

@ -33,9 +33,9 @@ public class TrafficMirroringProperties {
private boolean enabled = true;
/**
* Traffic mirroring request pool size. Default is 4.
* Traffic mirroring request pool size. Default is 100.
*/
private int requestPoolSize = 4;
private int requestPoolSize = 100;
/**
* Traffic mirroring request connection timeout in millisecond. Default is 5000.

Loading…
Cancel
Save