refactor:optimize performance.

Signed-off-by: Haotian Zhang <928016560@qq.com>
pull/1782/head
Haotian Zhang 1 week ago
parent 8ce1ebefc8
commit 35bfdaf41b

@ -108,10 +108,7 @@ public class EncodeTransferMedataFeignEnhancedPlugin implements EnhancedPlugin {
private void buildTransmittedHeader(Request request, Map<String, String> transHeaders) {
if (!CollectionUtils.isEmpty(transHeaders)) {
Map<String, Collection<String>> headers = getModifiableHeaders(request);
transHeaders.entrySet().stream().forEach(entry -> {
headers.remove(entry.getKey());
headers.put(entry.getKey(), Arrays.asList(entry.getValue()));
});
transHeaders.forEach((key, value) -> headers.put(key, Arrays.asList(value)));
}
}

@ -105,9 +105,7 @@ public class EncodeTransferMedataRestTemplateEnhancedPlugin implements EnhancedP
private void buildTransmittedHeader(HttpRequest request, Map<String, String> transHeaders) {
if (!CollectionUtils.isEmpty(transHeaders)) {
transHeaders.entrySet().stream().forEach(entry -> {
request.getHeaders().set(entry.getKey(), entry.getValue());
});
transHeaders.forEach((key, value) -> request.getHeaders().set(key, value));
}
}

@ -17,7 +17,6 @@
package com.tencent.cloud.metadata.core;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
@ -53,18 +52,18 @@ public final class TransHeadersTransfer {
// transHeaderMetadata: for example, {"trans-headers" : {"header1,header2,header3":""}}
Map<String, String> transHeaderMetadata = MetadataContextHolder.get().getTransHeaders();
if (!CollectionUtils.isEmpty(transHeaderMetadata)) {
String transHeaders = transHeaderMetadata.keySet().stream().findFirst().orElse("");
String transHeaders = transHeaderMetadata.keySet().iterator().next();
String[] transHeaderArray = transHeaders.split(",");
Enumeration<String> httpHeaders = httpServletRequest.getHeaderNames();
while (httpHeaders.hasMoreElements()) {
String httpHeader = httpHeaders.nextElement();
Arrays.stream(transHeaderArray).forEach(transHeader -> {
for (String transHeader : transHeaderArray) {
if (transHeader.equals(httpHeader)) {
String httpHeaderValue = httpServletRequest.getHeader(httpHeader);
// for example, {"trans-headers-kv" : {"header1":"v1","header2":"v2"...}}
MetadataContextHolder.get().setTransHeadersKV(httpHeader, httpHeaderValue);
}
});
}
}
}
}
@ -79,19 +78,19 @@ public final class TransHeadersTransfer {
// transHeaderMetadata: for example, {"trans-headers" : {"header1,header2,header3":""}}
Map<String, String> transHeaderMetadata = MetadataContextHolder.get().getTransHeaders();
if (!CollectionUtils.isEmpty(transHeaderMetadata)) {
String transHeaders = transHeaderMetadata.keySet().stream().findFirst().orElse("");
String transHeaders = transHeaderMetadata.keySet().iterator().next();
String[] transHeaderArray = transHeaders.split(",");
HttpHeaders headers = serverHttpRequest.getHeaders();
Set<String> headerKeys = headers.keySet();
for (String httpHeader : headerKeys) {
Arrays.stream(transHeaderArray).forEach(transHeader -> {
for (String transHeader : transHeaderArray) {
if (transHeader.equals(httpHeader)) {
List<String> list = headers.get(httpHeader);
String httpHeaderValue = JacksonUtils.serialize2Json(list);
// for example, {"trans-headers-kv" : {"header1":"v1","header2":"v2"...}}
MetadataContextHolder.get().setTransHeadersKV(httpHeader, httpHeaderValue);
}
});
}
}
}
}

@ -137,7 +137,7 @@ public final class PolarisConfigListenerContext {
Map<String, Object> origin = new HashMap<>(properties.asMap());
Map<String, ConfigPropertyChangeInfo> deleted = new HashMap<>();
origin.keySet().parallelStream().forEach(key -> {
origin.keySet().forEach(key -> {
if (!ret.containsKey(key)) {
deleted.put(key, new ConfigPropertyChangeInfo(key, String.valueOf(origin.get(key)), null, DELETED));
properties.invalidate(key);
@ -145,7 +145,7 @@ public final class PolarisConfigListenerContext {
});
changes.putAll(deleted);
ret.keySet().parallelStream().forEach(key -> {
ret.keySet().forEach(key -> {
Object oldValue = properties.getIfPresent(key);
Object newValue = ret.get(key);
if (newValue == null) {
@ -192,7 +192,7 @@ public final class PolarisConfigListenerContext {
for (ConfigChangeListener listener : listeners) {
Set<String> interestedChangedKeys = resolveInterestedChangedKeys(listener, changedKeys);
Map<String, ConfigPropertyChangeInfo> modifiedChanges = new HashMap<>(interestedChangedKeys.size());
interestedChangedKeys.parallelStream().forEach(key -> modifiedChanges.put(key, changes.get(key)));
interestedChangedKeys.forEach(key -> modifiedChanges.put(key, changes.get(key)));
ConfigChangeEvent event = new ConfigChangeEvent(modifiedChanges, interestedChangedKeys);
if (listener instanceof SyncConfigChangeListener) {

@ -37,11 +37,11 @@ public class MockedConfigKVFile implements ConfigKVFile {
private final Map<String, Object> properties;
private String fileName;
private String fileName = "MockedFileName";
private String fileGroup;
private String fileGroup = "MockedFileGroup";
private String namespace;
private String namespace = "MockedNamespace";
private final List<ConfigKVFileChangeListener> listeners = new ArrayList<>();

@ -57,7 +57,7 @@ public class ConsulHeartbeatProperties {
int ttlMinus1 = ttlValue - 1;
double min = Math.min(ttlMinus1, max);
Duration heartbeatInterval = Duration.ofMillis(Math.round(1000 * min));
LOGGER.debug("Computed heartbeatInterval: " + heartbeatInterval);
LOGGER.debug("Computed heartbeatInterval: {}", heartbeatInterval);
return heartbeatInterval;
}

@ -88,7 +88,10 @@ public abstract class AbstractPolarisLoadBalancer implements ReactorServiceInsta
try {
ProcessLoadBalanceResponse response = routerAPI.processLoadBalance(req);
log.debug("loadbalancer choose:" + response.getTargetInstance().getHost() + ":" + response.getTargetInstance().getPort());
if (log.isDebugEnabled()) {
log.debug("loadbalancer choose:{}:{}", response.getTargetInstance()
.getHost(), response.getTargetInstance().getPort());
}
return new DefaultResponse(new PolarisServiceInstance(response.getTargetInstance()));
}
catch (Exception e) {

@ -20,12 +20,10 @@ package com.tencent.cloud.polaris.router;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.tencent.cloud.common.constant.RouterConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
@ -40,6 +38,7 @@ import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInfo;
import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.router.api.core.RouterAPI;
import com.tencent.polaris.router.api.rpc.ProcessRoutersRequest;
import com.tencent.polaris.router.api.rpc.ProcessRoutersResponse;
@ -52,7 +51,6 @@ import org.springframework.cloud.client.loadbalancer.RequestDataContext;
import org.springframework.cloud.loadbalancer.core.DelegatingServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.http.HttpHeaders;
import org.springframework.util.CollectionUtils;
import static com.tencent.cloud.common.constant.ContextConstant.UTF_8;
@ -113,62 +111,64 @@ public class PolarisRouterServiceInstanceListSupplier extends DelegatingServiceI
}
PolarisRouterContext buildRouterContext(HttpHeaders headers) {
Collection<String> labelHeaderValues = headers.get(RouterConstant.ROUTER_LABEL_HEADER);
if (CollectionUtils.isEmpty(labelHeaderValues)) {
labelHeaderValues = new ArrayList<>();
}
PolarisRouterContext routerContext = new PolarisRouterContext();
List<String> labelHeaderValues = headers.get(RouterConstant.ROUTER_LABEL_HEADER);
Map<String, String> labelHeaderValuesMap = new HashMap<>();
try {
Optional<String> labelHeaderValuesOptional = labelHeaderValues.stream().findFirst();
if (labelHeaderValuesOptional.isPresent()) {
String labelHeaderValuesContent = labelHeaderValuesOptional.get();
labelHeaderValuesMap.putAll(
JacksonUtils.deserialize2Map(URLDecoder.decode(labelHeaderValuesContent, UTF_8)));
if (!CollectionUtils.isEmpty(labelHeaderValues)) {
try {
String labelHeaderValuesContent = labelHeaderValues.get(0);
Map<String, String> map = JacksonUtils.deserialize2Map(URLDecoder.decode(labelHeaderValuesContent, UTF_8));
labelHeaderValuesMap.putAll(map);
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException("unsupported charset exception " + UTF_8);
}
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException("unsupported charset exception " + UTF_8);
}
PolarisRouterContext routerContext = new PolarisRouterContext();
routerContext.putLabels(RouterConstant.ROUTER_LABELS, labelHeaderValuesMap);
return routerContext;
}
Flux<List<ServiceInstance>> doRouter(Flux<List<ServiceInstance>> allServers, PolarisRouterContext routerContext) {
ServiceInstances serviceInstances = RouterUtils.transferServersToServiceInstances(allServers, instanceTransformer);
List<ServiceInstance> filteredInstances = new ArrayList<>();
if (serviceInstances.getInstances().size() > 0) {
// filter instance by routers
ProcessRoutersRequest processRoutersRequest = buildProcessRoutersRequest(serviceInstances, routerContext);
// process request interceptors
processRouterRequestInterceptors(processRoutersRequest, routerContext);
// process router chain
ProcessRoutersResponse processRoutersResponse = routerAPI.processRouters(processRoutersRequest);
// process response interceptors
processRouterResponseInterceptors(routerContext, processRoutersResponse);
// transfer polaris server to ServiceInstance
ServiceInstances filteredServiceInstances = processRoutersResponse.getServiceInstances();
for (Instance instance : filteredServiceInstances.getInstances()) {
filteredInstances.add(new PolarisServiceInstance(instance));
}
}
return Flux.fromIterable(Collections.singletonList(filteredInstances));
MetadataContext metadataContext = MetadataContextHolder.get();
return RouterUtils.transferServersToServiceInstances(allServers, instanceTransformer)
.flatMapMany(serviceInstances -> {
List<ServiceInstance> filteredInstances = new ArrayList<>();
if (CollectionUtils.isNotEmpty(serviceInstances.getInstances())) {
// filter instance by routers
ProcessRoutersRequest processRoutersRequest = buildProcessRoutersRequest(
serviceInstances, routerContext, metadataContext);
// process request interceptors
processRouterRequestInterceptors(processRoutersRequest, routerContext);
// process router chain
ProcessRoutersResponse processRoutersResponse = routerAPI.processRouters(processRoutersRequest);
// process response interceptors
processRouterResponseInterceptors(routerContext, processRoutersResponse);
// transfer polaris server to ServiceInstance
ServiceInstances filteredServiceInstances = processRoutersResponse.getServiceInstances();
for (Instance instance : filteredServiceInstances.getInstances()) {
filteredInstances.add(new PolarisServiceInstance(instance));
}
}
return Flux.fromIterable(Collections.singletonList(filteredInstances));
});
}
ProcessRoutersRequest buildProcessRoutersRequest(ServiceInstances serviceInstances, PolarisRouterContext key) {
ProcessRoutersRequest buildProcessRoutersRequest(ServiceInstances serviceInstances, PolarisRouterContext key,
MetadataContext metadataContext) {
ProcessRoutersRequest processRoutersRequest = new ProcessRoutersRequest();
processRoutersRequest.setDstInstances(serviceInstances);
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setNamespace(MetadataContext.LOCAL_NAMESPACE);
serviceInfo.setService(MetadataContext.LOCAL_SERVICE);
processRoutersRequest.setSourceService(serviceInfo);
processRoutersRequest.setMetadataContainerGroup(MetadataContextHolder.get().getMetadataContainerGroup(false));
processRoutersRequest.setMetadataContainerGroup(metadataContext.getMetadataContainerGroup(false));
processRoutersRequest.setMetadataContext(metadataContext);
return processRoutersRequest;
}

@ -17,26 +17,20 @@
package com.tencent.cloud.polaris.router;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.tencent.cloud.common.constant.MetadataConstant;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.rpc.enhancement.transformer.InstanceTransformer;
import com.tencent.polaris.api.pojo.DefaultServiceInstances;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceInstances;
import com.tencent.polaris.api.pojo.ServiceKey;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.util.CollectionUtils;
@ -47,9 +41,6 @@ import org.springframework.util.CollectionUtils;
* @author lepdou 2022-05-17
*/
public final class RouterUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(RouterUtils.class);
private static final int WAIT_TIME = 3;
private RouterUtils() {
}
@ -60,25 +51,26 @@ public final class RouterUtils {
* @param servers servers
* @return ServiceInstances
*/
public static ServiceInstances transferServersToServiceInstances(Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) {
List<Instance> instanceList = Collections.synchronizedList(new ArrayList<>());
servers.flatMap((Function<List<ServiceInstance>, Publisher<?>>) serviceInstances ->
Flux.fromIterable(serviceInstances.stream()
.map(instanceTransformer::transform)
.collect(Collectors.toList()))).subscribe(instance -> instanceList.add((Instance) instance));
String serviceName = "";
Map<String, String> serviceMetadata = new HashMap<>();
if (!CollectionUtils.isEmpty(instanceList)) {
serviceName = instanceList.get(0).getService();
serviceMetadata = instanceList.get(0).getServiceMetadata();
}
public static Mono<ServiceInstances> transferServersToServiceInstances(
Flux<List<ServiceInstance>> servers, InstanceTransformer instanceTransformer) {
return servers.flatMapIterable(Function.identity())
.map(instanceTransformer::transform)
.collectList()
.map(instanceList -> {
String serviceName = "";
Map<String, String> serviceMetadata = new HashMap<>();
if (!CollectionUtils.isEmpty(instanceList)) {
serviceName = instanceList.get(0).getService();
serviceMetadata = instanceList.get(0).getServiceMetadata();
}
String namespace = MetadataContextHolder.get().getContextWithDefault(MetadataContext.FRAGMENT_APPLICATION_NONE,
MetadataConstant.POLARIS_TARGET_NAMESPACE, MetadataContext.LOCAL_NAMESPACE);
String namespace = MetadataContextHolder.get()
.getContextWithDefault(MetadataContext.FRAGMENT_APPLICATION_NONE,
MetadataConstant.POLARIS_TARGET_NAMESPACE, MetadataContext.LOCAL_NAMESPACE);
ServiceKey serviceKey = new ServiceKey(namespace, serviceName);
ServiceKey serviceKey = new ServiceKey(namespace, serviceName);
return new DefaultServiceInstances(serviceKey, instanceList, serviceMetadata);
return new DefaultServiceInstances(serviceKey, instanceList, serviceMetadata);
});
}
}

@ -17,7 +17,6 @@
package com.tencent.cloud.polaris.router.interceptor;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.polaris.router.PolarisRouterContext;
import com.tencent.cloud.polaris.router.config.properties.PolarisMetadataRouterProperties;
import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor;
@ -46,7 +45,7 @@ public class MetadataRouterRequestInterceptor implements RouterRequestIntercepto
return;
}
// set metadata router label keys
MetadataContainer metadataContainer = MetadataContextHolder.get()
MetadataContainer metadataContainer = request.getMetadataContext()
.getMetadataContainer(MetadataType.CUSTOM, false);
String metadataRouteKeys = metadataContainer.getRawMetadataStringValue(LABEL_KEY_METADATA_ROUTER_KEYS);
metadataContainer.putMetadataMapValue(MetadataRouter.ROUTER_TYPE_METADATA, MetadataRouter.KEY_METADATA_KEYS, metadataRouteKeys, TransitiveType.NONE);

@ -17,7 +17,6 @@
package com.tencent.cloud.polaris.router.interceptor;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.polaris.router.PolarisRouterContext;
import com.tencent.cloud.polaris.router.config.properties.PolarisNamespaceRouterProperties;
import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor;
@ -43,7 +42,7 @@ public class NamespaceRouterRequestInterceptor implements RouterRequestIntercept
@Override
public void apply(ProcessRoutersRequest request, PolarisRouterContext routerContext) {
// set namespace router enable
MetadataContainer metadataContainer = MetadataContextHolder.get()
MetadataContainer metadataContainer = request.getMetadataContext()
.getMetadataContainer(MetadataType.CUSTOM, false);
metadataContainer.putMetadataMapValue(NamespaceRouter.ROUTER_TYPE_NAMESPACE, NamespaceRouter.ROUTER_ENABLED,
String.valueOf(polarisNamespaceRouterProperties.isEnabled()), TransitiveType.NONE);

@ -17,7 +17,6 @@
package com.tencent.cloud.polaris.router.interceptor;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.polaris.router.PolarisRouterContext;
import com.tencent.cloud.polaris.router.config.properties.PolarisNearByRouterProperties;
import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor;
@ -43,7 +42,7 @@ public class NearbyRouterRequestInterceptor implements RouterRequestInterceptor
public void apply(ProcessRoutersRequest request, PolarisRouterContext routerContext) {
// set nearby router enable
boolean nearbyRouterEnabled = polarisNearByRouterProperties.isEnabled();
MetadataContainer metadataContainer = MetadataContextHolder.get()
MetadataContainer metadataContainer = request.getMetadataContext()
.getMetadataContainer(MetadataType.CUSTOM, false);
metadataContainer.putMetadataMapValue(NearbyRouter.ROUTER_TYPE_NEAR_BY, NearbyRouter.ROUTER_ENABLED, String.valueOf(nearbyRouterEnabled), TransitiveType.NONE);
}

@ -17,7 +17,6 @@
package com.tencent.cloud.polaris.router.interceptor;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.polaris.router.PolarisRouterContext;
import com.tencent.cloud.polaris.router.config.properties.PolarisRuleBasedRouterProperties;
import com.tencent.cloud.polaris.router.spi.RouterRequestInterceptor;
@ -43,7 +42,7 @@ public class RuleBasedRouterRequestInterceptor implements RouterRequestIntercept
public void apply(ProcessRoutersRequest request, PolarisRouterContext routerContext) {
// set rule based router enable
boolean ruleBasedRouterEnabled = polarisRuleBasedRouterProperties.isEnabled();
MetadataContainer metadataContainer = MetadataContextHolder.get()
MetadataContainer metadataContainer = request.getMetadataContext()
.getMetadataContainer(MetadataType.CUSTOM, false);
metadataContainer.putMetadataMapValue(RuleBasedRouter.ROUTER_TYPE_RULE_BASED, RuleBasedRouter.ROUTER_ENABLED, String.valueOf(ruleBasedRouterEnabled), TransitiveType.NONE);
// set rule based router fail over type.

@ -48,6 +48,7 @@ import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.metadata.core.MetadataContainer;
import com.tencent.polaris.metadata.core.MetadataType;
import com.tencent.polaris.metadata.core.TransitiveType;
import com.tencent.polaris.metadata.core.manager.MetadataContainerGroup;
import com.tencent.polaris.plugins.router.metadata.MetadataRouter;
import com.tencent.polaris.plugins.router.nearby.NearbyRouter;
import com.tencent.polaris.plugins.router.rule.RuleBasedRouter;
@ -62,6 +63,7 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultRequest;
@ -128,7 +130,7 @@ public class PolarisRouterServiceInstanceListSupplierTest {
ServiceInstances serviceInstances = assembleServiceInstances();
PolarisRouterContext routerContext = assembleRouterContext();
ProcessRoutersRequest request = polarisSupplier.buildProcessRoutersRequest(serviceInstances, routerContext);
ProcessRoutersRequest request = polarisSupplier.buildProcessRoutersRequest(serviceInstances, routerContext, MetadataContextHolder.get());
polarisSupplier.processRouterRequestInterceptors(request, routerContext);
String result = metadataContainer.getRawMetadataMapValue(MetadataRouter.ROUTER_TYPE_METADATA, MetadataRouter.KEY_METADATA_KEYS);
@ -152,7 +154,7 @@ public class PolarisRouterServiceInstanceListSupplierTest {
ServiceInstances serviceInstances = assembleServiceInstances();
PolarisRouterContext routerContext = assembleRouterContext();
ProcessRoutersRequest request = polarisSupplier.buildProcessRoutersRequest(serviceInstances, routerContext);
ProcessRoutersRequest request = polarisSupplier.buildProcessRoutersRequest(serviceInstances, routerContext, MetadataContextHolder.get());
polarisSupplier.processRouterRequestInterceptors(request, routerContext);
MetadataContainer metadataContainer = MetadataContextHolder.get()
@ -178,7 +180,7 @@ public class PolarisRouterServiceInstanceListSupplierTest {
ServiceInstances serviceInstances = assembleServiceInstances();
PolarisRouterContext routerContext = assembleRouterContext();
ProcessRoutersRequest request = polarisSupplier.buildProcessRoutersRequest(serviceInstances, routerContext);
ProcessRoutersRequest request = polarisSupplier.buildProcessRoutersRequest(serviceInstances, routerContext, MetadataContextHolder.get());
polarisSupplier.processRouterRequestInterceptors(request, routerContext);
MetadataContainer metadataContainer = MetadataContextHolder.get()
@ -246,9 +248,8 @@ public class PolarisRouterServiceInstanceListSupplierTest {
mockedApplicationContextAwareUtils.when(() -> ApplicationContextAwareUtils.getProperties(anyString()))
.thenReturn(testCallerService);
MetadataContextHolder.set(new MetadataContext());
mockedApplicationContextAwareUtils.when(() -> delegate.get())
.thenReturn(assembleServers());
mockedApplicationContextAwareUtils.when(() -> routerAPI.processRouters(any()))
when(delegate.get()).thenReturn(assembleServers());
when(routerAPI.processRouters(any()))
.thenReturn(new ProcessRoutersResponse(new DefaultServiceInstances(null, new ArrayList<>())));
PolarisRouterServiceInstanceListSupplier polarisSupplier = new PolarisRouterServiceInstanceListSupplier(
@ -260,10 +261,92 @@ public class PolarisRouterServiceInstanceListSupplierTest {
.build();
RequestDataContext requestDataContext = new RequestDataContext(new RequestData(httpRequest), "blue");
DefaultRequest request = new DefaultRequest(requestDataContext);
assertThat(polarisSupplier.get(request)).isNotNull();
List<ServiceInstance> instances = polarisSupplier.get(request).blockFirst();
assertThat(instances).isNotNull();
}
}
@Test
public void testConcurrencyContext() {
// 1. Setup
int concurrency = 100;
// Mock delegate to return servers on a different thread to simulate async behavior
// Force thread switching to verify if ThreadLocal is correctly captured and propagated
when(delegate.get()).thenAnswer(inv ->
assembleServers().publishOn(Schedulers.newSingle("delegate-thread"))
);
// Mock RouterAPI
// Verification logic: Extract ID from request and put it into response for subsequent verification
when(routerAPI.processRouters(any())).thenAnswer(invocation -> {
ProcessRoutersRequest req = invocation.getArgument(0);
MetadataContainerGroup group = req.getMetadataContainerGroup();
MetadataContainer container = group.getCustomMetadataContainer();
// Get ID from request context
String reqId = container.getRawMetadataStringValue("req-id");
// Put ID into returned service instance Metadata
DefaultInstance instance = new DefaultInstance();
instance.setMetadata(Collections.singletonMap("req-id", reqId));
List<Instance> instances = new ArrayList<>();
instances.add(instance);
ServiceInstances serviceInstances = new DefaultServiceInstances(new ServiceKey(testNamespace, testCalleeService), instances);
return new ProcessRoutersResponse(serviceInstances);
});
// Use empty interceptor list to avoid complex Mock
PolarisRouterServiceInstanceListSupplier polarisSupplier = new PolarisRouterServiceInstanceListSupplier(
delegate, routerAPI, Collections.emptyList(), null, new PolarisInstanceTransformer());
// 2. Execute Concurrently
List<String> results = Flux.range(0, concurrency)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(i -> {
String reqId = "id-" + i;
// Setup ThreadLocal: Set unique Context for each request
MetadataContext context = new MetadataContext();
context.getMetadataContainer(MetadataType.CUSTOM, false)
.putMetadataStringValue("req-id", reqId, TransitiveType.NONE);
MetadataContextHolder.set(context);
// Build Request
MockServerHttpRequest httpRequest = MockServerHttpRequest.get("/" + testCalleeService + "/users")
.build();
RequestDataContext requestDataContext = new RequestDataContext(new RequestData(httpRequest));
DefaultRequest request = new DefaultRequest(requestDataContext);
// Call & Verify
return polarisSupplier.get(request)
.map(instances -> {
if (instances.isEmpty()) {
return "empty";
}
ServiceInstance instance = instances.get(0);
String returnedId = instance.getMetadata().get("req-id");
// Verify if returned ID matches current request ID
if (reqId.equals(returnedId)) {
return "ok";
}
else {
return "error: expected " + reqId + " but got " + returnedId;
}
})
// Clear ThreadLocal
.doFinally(signal -> MetadataContextHolder.remove());
})
.sequential()
.collectList()
.block();
// 3. Verify All
assertThat(results).hasSize(concurrency);
assertThat(results).allMatch(s -> s.equals("ok"));
}
private void setTransitiveMetadata() {
if (initTransitiveMetadata.compareAndSet(false, true)) {
// mock transitive metadata

@ -59,7 +59,9 @@ public class RouterUtilsTest {
@Test
public void testTransferEmptyInstances() {
ServiceInstances serviceInstances = RouterUtils.transferServersToServiceInstances(Flux.empty(), new PolarisInstanceTransformer());
ServiceInstances serviceInstances = RouterUtils.transferServersToServiceInstances(Flux.empty(), new PolarisInstanceTransformer())
.block();
assertThat(serviceInstances).isNotNull();
assertThat(serviceInstances.getInstances()).isNotNull();
assertThat(serviceInstances.getInstances()).isEmpty();
}
@ -93,8 +95,9 @@ public class RouterUtilsTest {
instances.add(new PolarisServiceInstance(instance));
}
ServiceInstances serviceInstances = RouterUtils.transferServersToServiceInstances(Flux.just(instances), new PolarisInstanceTransformer());
ServiceInstances serviceInstances = RouterUtils.transferServersToServiceInstances(Flux.just(instances), new PolarisInstanceTransformer())
.block();
assertThat(serviceInstances).isNotNull();
assertThat(serviceInstances.getInstances()).isNotNull();
assertThat(serviceInstances.getInstances().size()).isEqualTo(instanceSize);

@ -17,7 +17,6 @@
package com.tencent.cloud.common.metadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -25,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.tencent.cloud.common.metadata.config.MetadataLocalProperties;
@ -259,58 +259,43 @@ public class StaticMetadataManager {
private void parseLocationMetadata(MetadataLocalProperties metadataLocalProperties,
List<InstanceMetadataProvider> instanceMetadataProviders) {
// resolve region info
if (!CollectionUtils.isEmpty(instanceMetadataProviders)) {
Set<String> providerRegions = instanceMetadataProviders.stream().map(InstanceMetadataProvider::getRegion)
.filter(region -> !StringUtils.isBlank(region)).collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(providerRegions)) {
if (providerRegions.size() > 1) {
throw new IllegalArgumentException("Multiple Regions Provided in InstanceMetadataProviders");
}
region = providerRegions.iterator().next();
}
}
if (StringUtils.isBlank(region)) {
region = System.getenv(ENV_METADATA_REGION);
}
if (StringUtils.isBlank(region)) {
region = metadataLocalProperties.getContent().get(LOCATION_KEY_REGION);
}
region = resolveLocationInfo(instanceMetadataProviders, metadataLocalProperties, ENV_METADATA_REGION,
LOCATION_KEY_REGION, InstanceMetadataProvider::getRegion);
// resolve zone info
if (!CollectionUtils.isEmpty(instanceMetadataProviders)) {
Set<String> providerZones = instanceMetadataProviders.stream().map(InstanceMetadataProvider::getZone)
.filter(zone -> !StringUtils.isBlank(zone)).collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(providerZones)) {
if (providerZones.size() > 1) {
throw new IllegalArgumentException("Multiple Zones Provided in InstanceMetadataProviders");
}
zone = providerZones.iterator().next();
}
}
if (StringUtils.isBlank(zone)) {
zone = System.getenv(ENV_METADATA_ZONE);
}
if (StringUtils.isBlank(zone)) {
zone = metadataLocalProperties.getContent().get(LOCATION_KEY_ZONE);
}
zone = resolveLocationInfo(instanceMetadataProviders, metadataLocalProperties, ENV_METADATA_ZONE,
LOCATION_KEY_ZONE, InstanceMetadataProvider::getZone);
// resolve campus info
if (!CollectionUtils.isEmpty(instanceMetadataProviders)) {
Set<String> providerCampus = instanceMetadataProviders.stream().map(InstanceMetadataProvider::getCampus)
.filter(campus -> !StringUtils.isBlank(campus)).collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(providerCampus)) {
if (providerCampus.size() > 1) {
throw new IllegalArgumentException("Multiple Campus Provided in InstanceMetadataProviders");
campus = resolveLocationInfo(instanceMetadataProviders, metadataLocalProperties, ENV_METADATA_CAMPUS,
LOCATION_KEY_CAMPUS, InstanceMetadataProvider::getCampus);
}
private String resolveLocationInfo(List<InstanceMetadataProvider> providers,
MetadataLocalProperties properties, String envKey, String propKey,
Function<InstanceMetadataProvider, String> extractor) {
// 1. Try providers
if (!CollectionUtils.isEmpty(providers)) {
Set<String> values = providers.stream()
.map(extractor)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toSet());
if (!CollectionUtils.isEmpty(values)) {
if (values.size() > 1) {
throw new IllegalArgumentException("Multiple values provided for " + propKey);
}
campus = providerCampus.iterator().next();
return values.iterator().next();
}
}
if (StringUtils.isBlank(campus)) {
campus = System.getenv(ENV_METADATA_CAMPUS);
}
if (StringUtils.isBlank(campus)) {
campus = metadataLocalProperties.getContent().get(LOCATION_KEY_CAMPUS);
// 2. Try environment variable
String value = System.getenv(envKey);
if (StringUtils.isNotBlank(value)) {
return value;
}
// 3. Try properties
return properties.getContent().get(propKey);
}
public Map<String, String> getAllEnvMetadata() {
@ -341,7 +326,7 @@ public class StaticMetadataManager {
transHeaderSet.addAll(transHeaderFromEnvSet);
transHeaderSet.addAll(transHeaderFromConfigSet);
return new ArrayList<>(transHeaderSet).stream().sorted().collect(Collectors.joining(","));
return transHeaderSet.stream().sorted().collect(Collectors.joining(","));
}
public Map<String, String> getEnvTransitiveMetadata() {

@ -19,7 +19,10 @@ package com.tencent.cloud.quickstart.caller;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -324,4 +327,71 @@ public class QuickstartCallerController {
return "Polaris Discovery is not enabled. Please set spring.cloud.polaris.discovery.enabled=true";
}
}
/**
* Concurrent call QuickstartCalleeService with 10 threads for 5 minutes.
* @return execution result
*/
@GetMapping("/concurrent-load")
public String concurrentLoad() {
int threads = 30;
int minutes = 2;
long duration = minutes * 60 * 1000;
long endTime = System.currentTimeMillis() + duration;
Random random = new Random();
// Use a separate thread to manage the load test to avoid blocking the response
new Thread(() -> {
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
executorService.submit(() -> {
String url = "http://QuickStartGatewayService/QuickstartCalleeService/quickstart/callee/info";
while (System.currentTimeMillis() < endTime) {
HttpHeaders headers = new HttpHeaders();
boolean headerIf = random.nextBoolean();
if (headerIf) {
// MetadataContextHolder.get().putContext(FRAGMENT_NONE, "k1", "v1");
headers.add("X-Polaris-Metadata-Transitive-k1", "v1");
}
else {
// MetadataContextHolder.get().putContext(FRAGMENT_NONE, "k1", "v2");
headers.add("X-Polaris-Metadata-Transitive-k1", "v2");
}
// 创建 HttpEntity 实例并传入 HttpHeaders
HttpEntity<String> entity = new HttpEntity<>(headers);
// 使用 exchange 方法发送 GET 请求,并获取响应
try {
ResponseEntity<String> responseEntity = restTemplate.exchange(url, HttpMethod.GET, entity, String.class);
String body = responseEntity.getBody();
if (headerIf && !StringUtils.contains(body, "38081")) {
LOG.error("Error: " + body + " with headerIf: " + headerIf);
}
else if (!headerIf && !StringUtils.contains(body, "48084")) {
LOG.error("Error: " + body + " with headerIf: " + headerIf);
}
}
catch (HttpClientErrorException | HttpServerErrorException httpClientErrorException) {
// ignore
}
}
});
}
executorService.shutdown();
try {
if (!executorService.awaitTermination(6, TimeUnit.MINUTES)) {
executorService.shutdownNow();
}
}
catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
LOG.info("Concurrent load test finished.");
}).start();
return "Concurrent load test started. " + threads + " threads will run for " + minutes + " minutes.";
}
}

@ -205,7 +205,7 @@ public class OAuthGatewayPlugin implements IGatewayPlugin<OAuthPlugin> {
// 这个requestUrl是从注册中心拿到服务列表并且balance之后带有IP信息的URL
// http://127.0.0.1:8080/group1/namespace1/Consumer-demo/echo-rest/1?user=1
URI requestUrl = this.reconstructURI(new OauthDelegatingServiceInstance(instance.getServer()), newUri);
logger.debug("LoadBalancerClientFilter url chosen: " + requestUrl);
logger.debug("LoadBalancerClientFilter url chosen: {}", requestUrl);
String result = sendAuthRequestByHttpMethod(paramsMap, headerParamsMap, requestUrl.toASCIIString(), tokenAuthMethod, timeout);
if (ClassUtils.isClassPresent("io.opentelemetry.context.Scope") && otScope instanceof Scope) {

Loading…
Cancel
Save