|
|
|
@ -22,6 +22,7 @@ import java.net.URI;
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.Set;
|
|
|
|
@ -41,6 +42,7 @@ import org.springframework.cloud.gateway.discovery.DiscoveryLocatorProperties;
|
|
|
|
|
import org.springframework.cloud.gateway.filter.GatewayFilter;
|
|
|
|
|
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
|
|
|
|
import org.springframework.cloud.gateway.filter.factory.SpringCloudCircuitBreakerFilterFactory;
|
|
|
|
|
import org.springframework.cloud.gateway.route.Route;
|
|
|
|
|
import org.springframework.cloud.gateway.support.HttpStatusHolder;
|
|
|
|
|
import org.springframework.cloud.gateway.support.ServiceUnavailableException;
|
|
|
|
|
import org.springframework.core.io.buffer.DataBuffer;
|
|
|
|
@ -58,6 +60,7 @@ import static java.util.Optional.ofNullable;
|
|
|
|
|
import static org.springframework.cloud.gateway.support.GatewayToStringStyler.filterToStringCreator;
|
|
|
|
|
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CIRCUITBREAKER_EXECUTION_EXCEPTION_ATTR;
|
|
|
|
|
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
|
|
|
|
|
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;
|
|
|
|
|
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.containsEncodedParts;
|
|
|
|
|
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.reset;
|
|
|
|
|
|
|
|
|
@ -180,52 +183,75 @@ public class PolarisCircuitBreakerFilterFactory extends SpringCloudCircuitBreake
|
|
|
|
|
return new GatewayFilter() {
|
|
|
|
|
@Override
|
|
|
|
|
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
|
|
|
|
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
|
|
|
|
|
String serviceName = circuitBreakerId;
|
|
|
|
|
if (route != null) {
|
|
|
|
|
serviceName = route.getUri().getHost();
|
|
|
|
|
}
|
|
|
|
|
String path = exchange.getRequest().getPath().value();
|
|
|
|
|
ReactiveCircuitBreaker cb = reactiveCircuitBreakerFactory.create(circuitBreakerId + "#" + path);
|
|
|
|
|
return cb.run(chain.filter(exchange).doOnSuccess(v -> {
|
|
|
|
|
if (statuses.contains(exchange.getResponse().getStatusCode())) {
|
|
|
|
|
HttpStatus status = exchange.getResponse().getStatusCode();
|
|
|
|
|
throw new CircuitBreakerStatusCodeException(status);
|
|
|
|
|
}
|
|
|
|
|
}), t -> {
|
|
|
|
|
if (config.getFallbackUri() == null) {
|
|
|
|
|
if (t instanceof CallAbortedException) {
|
|
|
|
|
CircuitBreakerStatus.FallbackInfo fallbackInfo = ((CallAbortedException) t).getFallbackInfo();
|
|
|
|
|
if (fallbackInfo != null) {
|
|
|
|
|
ServerHttpResponse response = exchange.getResponse();
|
|
|
|
|
response.setRawStatusCode(fallbackInfo.getCode());
|
|
|
|
|
if (fallbackInfo.getHeaders() != null) {
|
|
|
|
|
fallbackInfo.getHeaders().forEach((k, v) -> response.getHeaders().add(k, v));
|
|
|
|
|
ReactiveCircuitBreaker cb = reactiveCircuitBreakerFactory.create(serviceName + "#" + path);
|
|
|
|
|
return cb.run(
|
|
|
|
|
chain.filter(exchange)
|
|
|
|
|
.doOnSuccess(v -> {
|
|
|
|
|
// throw CircuitBreakerStatusCodeException by default for all need checking status
|
|
|
|
|
// so polaris can report right error status
|
|
|
|
|
Set<HttpStatus> statusNeedToCheck = new HashSet<>();
|
|
|
|
|
statusNeedToCheck.addAll(statuses);
|
|
|
|
|
statusNeedToCheck.addAll(getDefaultStatus());
|
|
|
|
|
if (statusNeedToCheck.contains(exchange.getResponse().getStatusCode())) {
|
|
|
|
|
HttpStatus status = exchange.getResponse().getStatusCode();
|
|
|
|
|
throw new CircuitBreakerStatusCodeException(status);
|
|
|
|
|
}
|
|
|
|
|
}),
|
|
|
|
|
t -> {
|
|
|
|
|
// pre-check CircuitBreakerStatusCodeException's status matches input status
|
|
|
|
|
if (t instanceof CircuitBreakerStatusCodeException) {
|
|
|
|
|
HttpStatus status = ((CircuitBreakerStatusCodeException) t).getStatusCode();
|
|
|
|
|
// no need to fallback
|
|
|
|
|
if (!statuses.contains(status)) {
|
|
|
|
|
return Mono.error(t);
|
|
|
|
|
}
|
|
|
|
|
if (fallbackInfo.getBody() != null) {
|
|
|
|
|
byte[] bytes = fallbackInfo.getBody().getBytes(StandardCharsets.UTF_8);
|
|
|
|
|
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
|
|
|
|
|
return exchange.getResponse().writeWith(Flux.just(buffer));
|
|
|
|
|
}
|
|
|
|
|
// do fallback
|
|
|
|
|
if (config.getFallbackUri() == null) {
|
|
|
|
|
// polaris checking
|
|
|
|
|
if (t instanceof CallAbortedException) {
|
|
|
|
|
CircuitBreakerStatus.FallbackInfo fallbackInfo = ((CallAbortedException) t).getFallbackInfo();
|
|
|
|
|
if (fallbackInfo != null) {
|
|
|
|
|
ServerHttpResponse response = exchange.getResponse();
|
|
|
|
|
response.setRawStatusCode(fallbackInfo.getCode());
|
|
|
|
|
if (fallbackInfo.getHeaders() != null) {
|
|
|
|
|
fallbackInfo.getHeaders().forEach((k, v) -> response.getHeaders().add(k, v));
|
|
|
|
|
}
|
|
|
|
|
DataBuffer bodyBuffer = null;
|
|
|
|
|
if (fallbackInfo.getBody() != null) {
|
|
|
|
|
byte[] bytes = fallbackInfo.getBody().getBytes(StandardCharsets.UTF_8);
|
|
|
|
|
bodyBuffer = response.bufferFactory().wrap(bytes);
|
|
|
|
|
}
|
|
|
|
|
return bodyBuffer != null ? response.writeWith(Flux.just(bodyBuffer)) : response.setComplete();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return chain.filter(exchange);
|
|
|
|
|
return Mono.error(t);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Mono.error(t);
|
|
|
|
|
}
|
|
|
|
|
exchange.getResponse().setStatusCode(null);
|
|
|
|
|
reset(exchange);
|
|
|
|
|
|
|
|
|
|
exchange.getResponse().setStatusCode(null);
|
|
|
|
|
reset(exchange);
|
|
|
|
|
// TODO: copied from RouteToRequestUrlFilter
|
|
|
|
|
URI uri = exchange.getRequest().getURI();
|
|
|
|
|
// TODO: assume always?
|
|
|
|
|
boolean encoded = containsEncodedParts(uri);
|
|
|
|
|
URI requestUrl = UriComponentsBuilder.fromUri(uri).host(null).port(null)
|
|
|
|
|
.uri(config.getFallbackUri()).scheme(null).build(encoded).toUri();
|
|
|
|
|
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
|
|
|
|
|
addExceptionDetails(t, exchange);
|
|
|
|
|
|
|
|
|
|
// TODO: copied from RouteToRequestUrlFilter
|
|
|
|
|
URI uri = exchange.getRequest().getURI();
|
|
|
|
|
// TODO: assume always?
|
|
|
|
|
boolean encoded = containsEncodedParts(uri);
|
|
|
|
|
URI requestUrl = UriComponentsBuilder.fromUri(uri).host(null).port(null)
|
|
|
|
|
.uri(config.getFallbackUri()).scheme(null).build(encoded).toUri();
|
|
|
|
|
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
|
|
|
|
|
addExceptionDetails(t, exchange);
|
|
|
|
|
// Reset the exchange
|
|
|
|
|
reset(exchange);
|
|
|
|
|
|
|
|
|
|
// Reset the exchange
|
|
|
|
|
reset(exchange);
|
|
|
|
|
|
|
|
|
|
ServerHttpRequest request = exchange.getRequest().mutate().uri(requestUrl).build();
|
|
|
|
|
return getDispatcherHandler().handle(exchange.mutate().request(request).build());
|
|
|
|
|
}).onErrorResume(t -> handleErrorWithoutFallback(t, config.isResumeWithoutError()));
|
|
|
|
|
ServerHttpRequest request = exchange.getRequest().mutate().uri(requestUrl).build();
|
|
|
|
|
return getDispatcherHandler().handle(exchange.mutate().request(request).build());
|
|
|
|
|
})
|
|
|
|
|
.onErrorResume(t -> handleErrorWithoutFallback(t, config.isResumeWithoutError()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -245,6 +271,9 @@ public class PolarisCircuitBreakerFilterFactory extends SpringCloudCircuitBreake
|
|
|
|
|
if (t instanceof CallAbortedException) {
|
|
|
|
|
return Mono.error(new ServiceUnavailableException());
|
|
|
|
|
}
|
|
|
|
|
if (t instanceof CircuitBreakerStatusCodeException) {
|
|
|
|
|
return Mono.empty();
|
|
|
|
|
}
|
|
|
|
|
if (resumeWithoutError) {
|
|
|
|
|
return Mono.empty();
|
|
|
|
|
}
|
|
|
|
|