feat: support kafka lane. (#1791)

2023
shedfreewu 1 week ago committed by GitHub
parent d26a21081d
commit 2dd809094d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -53,3 +53,4 @@
- [feat: support traffic gray lane router](https://github.com/Tencent/spring-cloud-tencent/pull/1785)
- [fix: fix NPE when application context is null #1787](https://github.com/Tencent/spring-cloud-tencent/pull/1787)
- [fix: fix lane router property name.](https://github.com/Tencent/spring-cloud-tencent/pull/1789)
- [feat: support kafka lane.](https://github.com/Tencent/spring-cloud-tencent/pull/1791)

@ -23,6 +23,7 @@ import com.tencent.cloud.polaris.discovery.ConditionalOnPolarisDiscoveryEnabled;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* Configuration for listening the change of service status.
@ -41,8 +42,8 @@ public class PolarisRefreshConfiguration {
@Bean
@ConditionalOnMissingBean
public ServiceInstanceChangeCallbackManager serviceInstanceChangeCallbackManager() {
return new ServiceInstanceChangeCallbackManager();
public ServiceInstanceChangeCallbackManager serviceInstanceChangeCallbackManager(Environment environment) {
return new ServiceInstanceChangeCallbackManager(environment);
}
@Bean

@ -35,6 +35,7 @@ import reactor.util.annotation.NonNull;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
/**
@ -50,7 +51,10 @@ public class ServiceInstanceChangeCallbackManager implements ApplicationListener
private final ScheduledThreadPoolExecutor serviceChangeListenerExecutor;
public ServiceInstanceChangeCallbackManager() {
private final Environment environment;
public ServiceInstanceChangeCallbackManager(Environment environment) {
this.environment = environment;
this.serviceChangeListenerExecutor = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("service-change-listener"));
}
@ -101,6 +105,19 @@ public class ServiceInstanceChangeCallbackManager implements ApplicationListener
if (clz.isAnnotationPresent(ServiceInstanceChangeListener.class)) {
ServiceInstanceChangeListener serviceInstanceChangeListener = clz.getAnnotation(ServiceInstanceChangeListener.class);
serviceName = serviceInstanceChangeListener.serviceName();
String message = null;
try {
serviceName = environment.resolveRequiredPlaceholders(serviceName);
}
catch (Exception e) {
// resolve failed, reset service name.
message = e.getMessage();
serviceName = null;
}
if (StringUtils.isBlank(serviceName)) {
LOG.warn("resolve service name failed, bean name:{}, config service name:{}, message:{}",
beanName, serviceInstanceChangeListener.serviceName(), message);
}
}
if (StringUtils.isBlank(serviceName)) {
@ -123,8 +140,8 @@ public class ServiceInstanceChangeCallbackManager implements ApplicationListener
@Override
public void onApplicationEvent(@NonNull ApplicationReadyEvent event) {
PolarisDiscoveryClient polarisDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisDiscoveryClient.class);
PolarisReactiveDiscoveryClient polarisReactiveDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisReactiveDiscoveryClient.class);
PolarisDiscoveryClient polarisDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisDiscoveryClient.class, false);
PolarisReactiveDiscoveryClient polarisReactiveDiscoveryClient = ApplicationContextAwareUtils.getBeanIfExists(PolarisReactiveDiscoveryClient.class, false);
for (String serviceName : callbackMap.keySet()) {
try {
if (polarisDiscoveryClient != null) {

@ -0,0 +1,164 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.polaris.discovery.refresh;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import com.tencent.cloud.polaris.registry.PolarisAutoServiceRegistration;
import com.tencent.polaris.api.pojo.Instance;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* Test for {@link ServiceInstanceChangeCallback}.
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,
classes = ServiceInstanceChangeCallbackTest.TestApplication.class,
properties = {"spring.config.location = classpath:application-test.yml",
"spring.main.web-application-type = servlet",
"spring.cloud.gateway.enabled = false"})
public class ServiceInstanceChangeCallbackTest {
@Autowired
ServiceInstanceChangeCallbackManager serviceInstanceChangeCallbackManager;
@Test
public void test1() {
// Get callbackMap from serviceInstanceChangeCallbackManager via reflection
try {
Field callbackMapField = ServiceInstanceChangeCallbackManager.class.getDeclaredField("callbackMap");
callbackMapField.setAccessible(true);
ConcurrentHashMap<String, List<ServiceInstanceChangeCallback>> callbackMap =
(ConcurrentHashMap<String, List<ServiceInstanceChangeCallback>>) callbackMapField.get(serviceInstanceChangeCallbackManager);
// Verify
assertThat(callbackMap.containsKey("java_provider_test")).isTrue();
assertThat(callbackMap.containsKey("QuickstartCalleeService")).isTrue();
// ignore error and empty
assertThat(callbackMap.size()).isEqualTo(2);
}
catch (Exception e) {
throw new RuntimeException("Failed to get callbackMap via reflection", e);
}
}
@SpringBootApplication
protected static class TestApplication {
@Bean
public SelfServiceChangeCallback selfServiceChangeCallback() {
return new SelfServiceChangeCallback();
}
@Bean
public CalleeServiceChangeCallback calleeServiceChangeCallback() {
return new CalleeServiceChangeCallback();
}
@Bean
public ErrorServiceChangeCallback errorServiceChangeCallback() {
return new ErrorServiceChangeCallback();
}
@Bean
public EmptyServiceChangeCallback emptyServiceChangeCallback() {
return new EmptyServiceChangeCallback();
}
@Bean
public ParsingEmptyServiceChangeCallback parseEmptyServiceChangeCallback() {
return new ParsingEmptyServiceChangeCallback();
}
@Bean
public TestBeanPostProcessor testBeanPostProcessor() {
return new TestBeanPostProcessor();
}
}
@ServiceInstanceChangeListener(serviceName = "${spring.application.name}")
static class SelfServiceChangeCallback implements ServiceInstanceChangeCallback {
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
}
}
@ServiceInstanceChangeListener(serviceName = "${error.name}")
static class ErrorServiceChangeCallback implements ServiceInstanceChangeCallback {
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
}
}
@ServiceInstanceChangeListener(serviceName = "${test.empty}")
static class ParsingEmptyServiceChangeCallback implements ServiceInstanceChangeCallback {
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
}
}
@ServiceInstanceChangeListener(serviceName = "")
static class EmptyServiceChangeCallback implements ServiceInstanceChangeCallback {
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
}
}
@ServiceInstanceChangeListener(serviceName = "QuickstartCalleeService")
static class CalleeServiceChangeCallback implements ServiceInstanceChangeCallback {
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
}
}
static class TestBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof PolarisAutoServiceRegistration) {
return org.mockito.Mockito.mock(PolarisAutoServiceRegistration.class);
}
return bean;
}
}
}

@ -32,3 +32,5 @@ spring:
username: nacos
password: nacos
cluster-name: polaris
test:
empty:

@ -93,11 +93,17 @@ public class ApplicationContextAwareUtils implements ApplicationContextAware {
}
public static <T> T getBeanIfExists(Class<T> requiredType) {
return getBeanIfExists(requiredType, false);
}
public static <T> T getBeanIfExists(Class<T> requiredType, boolean warnIfFailed) {
try {
return applicationContext.getBean(requiredType);
}
catch (Throwable e) {
LOGGER.warn("get bean failed, bean type: {}", requiredType.getName());
if (warnIfFailed) {
LOGGER.warn("get bean failed, bean type: {}", requiredType.getName());
}
return null;
}
}

@ -226,6 +226,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-mq-plugin</artifactId>
<version>${revision}</version>
</dependency>
<!-- third part framework dependencies -->
<dependency>
<groupId>org.springdoc</groupId>

@ -26,6 +26,7 @@
<module>spring-cloud-starter-tencent-traffic-mirroring-plugin</module>
<module>spring-cloud-starter-tencent-fault-injection-plugin</module>
<module>spring-cloud-starter-tencent-tsf-tls-plugin</module>
<module>spring-cloud-starter-tencent-mq-plugin</module>
</modules>
</project>

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-tencent-plugin-starters</artifactId>
<groupId>com.tencent.cloud</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-starter-tencent-mq-plugin</artifactId>
<name>Spring Cloud Starter Tencent Message Queue plugin</name>
<dependencies>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-polaris-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -0,0 +1,53 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import java.util.List;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
public abstract class AbstractActiveLane extends AbstractResourceEventListener {
public abstract boolean ifConsume(String laneId, MqLaneProperties mqLaneProperties);
public abstract String getLaneHeaderKey();
public String formatLaneId(String laneId) {
return laneId;
}
public abstract void callback(List<Instance> currentServiceInstances);
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE
&& newValue instanceof ServiceInstancesByProto
&& StringUtils.equals(svcEventKey.getService(), MetadataContext.LOCAL_SERVICE)) {
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
callback(newIns.getInstances());
}
}
}

@ -0,0 +1,55 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
public class LaneRuleListener extends AbstractResourceEventListener {
private final Runnable refreshAction;
public LaneRuleListener(Runnable refreshAction) {
this.refreshAction = refreshAction;
}
@Override
public void onResourceAdd(ServiceEventKey svcEventKey, RegistryCacheValue newValue) {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
refreshAction.run();
}
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
refreshAction.run();
}
@Override
public void onResourceDeleted(ServiceEventKey svcEventKey, RegistryCacheValue oldValue) {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
refreshAction.run();
}
}

@ -0,0 +1,29 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
public interface MqLaneProperties {
Boolean getLaneOn();
Boolean getLaneConsumeMain();
Boolean getMainConsumeLane();
Boolean getAutoSetHeader();
}

@ -0,0 +1,210 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.consumer.ConsumerConfig;
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.BaseLaneMode;
import com.tencent.polaris.plugins.router.lane.LaneRouter;
import com.tencent.polaris.plugins.router.lane.LaneRouterConfig;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.serviceregistry.Registration;
public class PolarisActiveLane extends AbstractActiveLane implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(PolarisActiveLane.class);
private final PolarisSDKContextManager polarisSDKContextManager;
private final PolarisDiscoveryHandler discoveryClient;
@Value("${spring.application.name:}")
private String springApplicationName;
/**
* current instance lane tag, related to baseLaneMode.
*/
private volatile String instanceLaneTag = "";
private volatile boolean serviceInLane = false;
private volatile List<LaneProto.LaneGroup> groups;
private Registration registration;
private BaseLaneMode baseLaneMode = BaseLaneMode.ONLY_UNTAGGED_INSTANCE;
public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager,
PolarisDiscoveryHandler discoveryClient, Registration registration) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.discoveryClient = discoveryClient;
this.registration = registration;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus)));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(this));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getConfig).map(Configuration::getConsumer).map(ConsumerConfig::getServiceRouter).ifPresent(serviceRouterConfig -> {
LaneRouterConfig laneRouterConfig = serviceRouterConfig.getPluginConfig(ServiceRouterConfig.DEFAULT_ROUTER_LANE, LaneRouterConfig.class);
if (laneRouterConfig != null) {
baseLaneMode = laneRouterConfig.getBaseLaneMode();
}
});
}
@Override
public void afterPropertiesSet() {
// get instances to trigger callback when instances change
discoveryClient.getHealthyInstances(springApplicationName);
}
@Override
public void callback(List<Instance> currentServiceInstances) {
if (LOG.isDebugEnabled()) {
LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
}
freshWhenInstancesChange(currentServiceInstances);
if (LOG.isDebugEnabled()) {
LOG.debug("current instanceLaneTag:{}, serviceInLane: {}, baseLaneMode:{}", instanceLaneTag, serviceInLane, baseLaneMode);
}
}
private void freshWhenInstancesChange(List<Instance> currentServices) {
freshLaneStatus();
String tempInstanceLaneTag = "";
// get all active groups
if (CollectionUtils.isNotEmpty(currentServices)) {
for (Instance healthService : currentServices) {
if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) {
tempInstanceLaneTag = healthService.getMetadata().get("lane");
break;
}
}
}
if (BaseLaneMode.ONLY_UNTAGGED_INSTANCE.equals(baseLaneMode)) {
instanceLaneTag = tempInstanceLaneTag;
}
else {
// if baseLaneMode is EXCLUDE_ENABLED_LANE_INSTANCE, check if the instance lane tag is in the lane
boolean laneTagExist = false;
for (LaneProto.LaneGroup group : getGroups()) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(rule.getDefaultLabelValue(), tempInstanceLaneTag)) {
laneTagExist = true;
}
}
}
instanceLaneTag = laneTagExist ? tempInstanceLaneTag : "";
}
}
/**
* update lane status.
*/
public void freshLaneStatus() {
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
serviceInLane = CollectionUtils.isNotEmpty(groups);
}
public boolean currentInstanceInLane() {
return StringUtils.isNotEmpty(instanceLaneTag) && serviceInLane;
}
public String getInstanceLaneTag() {
return instanceLaneTag;
}
public List<LaneProto.LaneGroup> getGroups() {
return groups == null ? Collections.emptyList() : groups;
}
@Override
public boolean ifConsume(String messageLaneId, MqLaneProperties mqLaneProperties) {
// message has no lane id
if (StringUtils.isEmpty(messageLaneId)) {
if (!currentInstanceInLane()) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return mqLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(messageLaneId);
// message has lane id
if (!currentInstanceInLane()) {
// baseline service
return mqLaneProperties.getMainConsumeLane();
}
else {
// whether the message lane id is the same as the lane id of the listener
for (LaneProto.LaneGroup group : getGroups()) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule))
&& StringUtils.equals(rule.getDefaultLabelValue(), getInstanceLaneTag())) {
return true;
}
}
}
return false;
}
}
}
@Override
public String getLaneHeaderKey() {
return MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL;
}
}

@ -0,0 +1,267 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
@Aspect
@Order(1)
public class KafkaLaneAspect {
private static final Logger LOG = LoggerFactory.getLogger(KafkaLaneAspect.class);
/**
* Empty object.
*/
public static final Object EMPTY_OBJECT = new Object();
private final PolarisSDKContextManager polarisSDKContextManager;
private final KafkaLaneProperties kafkaLaneProperties;
private final AbstractActiveLane activeLane;
private final String laneHeaderKey;
public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, AbstractActiveLane activeLane) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.kafkaLaneProperties = kafkaLaneProperties;
this.activeLane = activeLane;
laneHeaderKey = activeLane.getLaneHeaderKey();
}
@Pointcut("execution(* org.springframework.kafka.core.KafkaTemplate.send(..))")
private void producerPointcut() {
}
@Around("producerPointcut()")
public Object aroundProducerMessage(ProceedingJoinPoint pjp) throws Throwable {
Object[] args = pjp.getArgs();
KafkaTemplate target = (KafkaTemplate) pjp.getTarget();
if (!this.kafkaLaneProperties.getLaneOn()) {
return pjp.proceed(args);
}
String laneId = LaneUtils.fetchLaneByCaller(
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext).map(SDKContext::getExtensions).orElse(null),
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
if (StringUtils.isBlank(laneId) || !kafkaLaneProperties.getAutoSetHeader()) {
return pjp.proceed(args);
}
LOG.debug("kafka producer lane before, args: {}, thread laneId: {}", args, laneId);
try {
ProducerRecord producerRecord;
if (args.length == 1) {
// ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message);
if (args[0] instanceof Message) {
Message message = (Message) args[0];
producerRecord = target.getMessageConverter().fromMessage(message, target.getDefaultTopic());
// possibly no Jackson
if (!producerRecord.headers().iterator().hasNext()) {
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
if (correlationId != null) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
}
else {
producerRecord = (ProducerRecord) args[0];
}
}
else if (args.length == 2) {
// ListenableFuture<SendResult<K, V>> send(String topic, V data);
producerRecord = new ProducerRecord<>((String) args[0], args[1]);
}
else if (args.length == 3) {
// ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
producerRecord = new ProducerRecord<>((String) args[0], args[1], args[2]);
}
else if (args.length == 4) {
// ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
producerRecord = new ProducerRecord<>((String) args[0], (Integer) args[1], args[2], args[3]);
}
else if (args.length == 5) {
// ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
producerRecord = new ProducerRecord<>((String) args[0], (Integer) args[1], (Long) args[2], args[3], args[4]);
}
else {
LOG.error("KafkaTemplate send message with wrong args: {}", args);
return pjp.proceed(args);
}
Header laneHeader = new RecordHeader(laneHeaderKey, laneId.getBytes(StandardCharsets.UTF_8));
producerRecord.headers().add(laneHeader);
LOG.debug("kafka producer lane after, args: {}, laneId: {}", producerRecord, laneId);
return target.send(producerRecord);
}
catch (Exception e) {
LOG.error("add laneId to kafka message error", e);
}
return pjp.proceed(args);
}
@Pointcut("@annotation(org.springframework.kafka.annotation.KafkaListener)")
private void consumerPointcut() {
}
@Around("consumerPointcut()")
public Object aroundConsumerMessage(ProceedingJoinPoint pjp) throws Throwable {
Object[] args = pjp.getArgs();
if (!this.kafkaLaneProperties.getLaneOn()) {
return pjp.proceed(args);
}
// init metadata context
MetadataContextHolder.get();
try {
ConsumerRecord consumerRecord = null;
List messageList = null;
int dataPosition = -1;
Acknowledgment acknowledgment = null;
for (int i = 0; i < args.length; i++) {
if ((args[i] instanceof Acknowledgment)) {
acknowledgment = (Acknowledgment) args[i];
}
else if ((args[i] instanceof ConsumerRecord)) {
consumerRecord = (ConsumerRecord) args[i];
dataPosition = i;
}
else if (args[i] instanceof List) {
messageList = (List) args[i];
dataPosition = i;
}
}
// parameter is message list, for batch consume
if (messageList != null) {
// empty list directly return
if (messageList.isEmpty()) {
return pjp.proceed(args);
}
// parameter is not consumerRecord, consume directly
if (!(messageList.get(0) instanceof ConsumerRecord)) {
return pjp.proceed(args);
}
List<ConsumerRecord> newMessageList = new ArrayList<>();
for (Object item : messageList) {
ConsumerRecord record = (ConsumerRecord) item;
String laneId = this.getConsumerRecordLaneId(record);
boolean ifConsume = this.ifConsume(laneId);
if (ifConsume) {
newMessageList.add(record);
}
else {
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
if (LOG.isDebugEnabled()) {
LOG.debug("not need consume, laneId: {}, message:{}", laneId, record);
}
}
}
args[dataPosition] = newMessageList;
}
// parameter is consumerRecord
if (consumerRecord != null) {
String laneId = this.getConsumerRecordLaneId(consumerRecord);
boolean ifConsume = this.ifConsume(laneId);
if (!ifConsume) {
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
if (LOG.isDebugEnabled()) {
LOG.debug("not need consume, laneId: {}, message:{}", laneId, consumerRecord);
}
return EMPTY_OBJECT;
}
}
}
catch (Exception e) {
LOG.error("extract laneId from kafka message error", e);
}
Object result = pjp.proceed(args);
LaneUtils.removeCallerLaneId();
return result;
}
String getConsumerRecordLaneId(ConsumerRecord consumerRecord) {
String laneId = null;
Iterator<Header> iterator = consumerRecord.headers().headers(laneHeaderKey).iterator();
if (iterator.hasNext()) {
laneId = new String(iterator.next().value(), StandardCharsets.UTF_8);
}
// falls back to the laneId from the metadata context (set by the user's aspect) if the header is empty
if (StringUtils.isBlank(laneId)) {
laneId = LaneUtils.getCallerLaneId();
}
// format laneId
laneId = activeLane.formatLaneId(laneId);
return laneId;
}
/**
* whether the message is consumed by the current listener.
* @param messageLaneId message lane id.
* @return whether to consume.
*/
boolean ifConsume(String messageLaneId) {
return activeLane.ifConsume(messageLaneId, kafkaLaneProperties);
}
}

@ -0,0 +1,58 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaLaneProperties.class)
public class KafkaLaneAspectConfiguration {
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
public KafkaLaneAspect kafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager,
KafkaLaneProperties kafkaLaneProperties,
AbstractActiveLane activeLane) {
return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, activeLane);
}
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
@ConditionalOnMissingBean
public AbstractActiveLane activeLane(PolarisSDKContextManager polarisSDKContextManager,
PolarisDiscoveryHandler discoveryClient, Registration registration) {
if (TsfContextUtils.isOnlyTsfConsulEnabled()) {
return new TsfActiveLane(polarisSDKContextManager, discoveryClient);
}
else {
return new PolarisActiveLane(polarisSDKContextManager, discoveryClient, registration);
}
}
}

@ -0,0 +1,82 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import com.tencent.cloud.plugin.mq.lane.MqLaneProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.polaris.lane.kafka")
public class KafkaLaneProperties implements MqLaneProperties {
/**
* enable kafka lane.
*/
private Boolean laneOn = false;
/**
* lane listener whether to consume main message.
* scene: message without lane label, listener service with lane label, force lane service to consume main message.
* default false.
*/
private Boolean laneConsumeMain = false;
/**
* main listener whether to consume lane message.
* scene: message with lane label, lane not deployed or not online, main consume lane message.
* default false.
*/
private Boolean mainConsumeLane = false;
/**
* whether to set lane id to message header.
*/
private Boolean autoSetHeader = true;
public Boolean getLaneOn() {
return laneOn;
}
public void setLaneOn(Boolean laneOn) {
this.laneOn = laneOn;
}
public Boolean getLaneConsumeMain() {
return laneConsumeMain;
}
public void setLaneConsumeMain(Boolean laneConsumeMain) {
this.laneConsumeMain = laneConsumeMain;
}
public Boolean getMainConsumeLane() {
return mainConsumeLane;
}
public void setMainConsumeLane(Boolean mainConsumeLane) {
this.mainConsumeLane = mainConsumeLane;
}
public Boolean getAutoSetHeader() {
return autoSetHeader;
}
public void setAutoSetHeader(Boolean autoSetHeader) {
this.autoSetHeader = autoSetHeader;
}
}

@ -0,0 +1,250 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.tsf;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.plugin.mq.lane.LaneRuleListener;
import com.tencent.cloud.plugin.mq.lane.MqLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.metadata.core.constant.TsfMetadataConstants;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
public class TsfActiveLane extends AbstractActiveLane implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(TsfActiveLane.class);
private static final String TSF_LANE_ID = "tsf_laneId";
private final PolarisSDKContextManager polarisSDKContextManager;
private final PolarisDiscoveryHandler discoveryClient;
/**
* Online deployment groups for this service (same namespace id and application id required).
*/
private volatile Set<String> activeGroupSet = new HashSet<>();
private volatile Set<String> currentGroupLaneIds = new HashSet<>();
/**
* key: laneId.
* value: true - online, false - offline.
*/
private volatile Map<String, Boolean> laneActiveMap = new HashMap<>();
@Value("${tsf_namespace_id:}")
private String tsfNamespaceId;
@Value("${tsf_group_id:}")
private String tsfGroupId;
@Value("${tsf_application_id:}")
private String tsfApplicationId;
@Value("${spring.application.name:}")
private String springApplicationName;
public TsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.discoveryClient = discoveryClient;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus)));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(this));
}
@Override
public void afterPropertiesSet() {
// get instances to trigger callback when instances change
discoveryClient.getHealthyInstances(springApplicationName);
}
@Override
public void callback(List<Instance> currentServiceInstances) {
if (LOG.isDebugEnabled()) {
LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
LOG.debug("current namespaceId: {}, groupId: {}, applicationId: {}", tsfNamespaceId, tsfGroupId, tsfApplicationId);
}
freshWhenInstancesChange(currentServiceInstances);
if (LOG.isDebugEnabled()) {
LOG.debug("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap));
}
}
private void freshWhenInstancesChange(List<Instance> currentServices) {
if (currentServices == null || currentServices.isEmpty()) {
return;
}
Set<String> currentActiveGroupSet = new HashSet<>();
// get all active groups
for (Instance healthService : currentServices) {
String nsId = healthService.getMetadata().get("TSF_NAMESPACE_ID");
String groupId = healthService.getMetadata().get("TSF_GROUP_ID");
String applicationId = healthService.getMetadata().get("TSF_APPLICATION_ID");
if (StringUtils.equals(tsfNamespaceId, nsId) && StringUtils.equals(tsfApplicationId, applicationId) && StringUtils.isNotEmpty(groupId)) {
currentActiveGroupSet.add(groupId);
}
}
activeGroupSet = currentActiveGroupSet;
freshLaneStatus();
}
/**
* update lane status.
*/
public void freshLaneStatus() {
Map<String, Boolean> currentLaneActiveMap = new HashMap<>();
Set<String> tempCurrentGroupLaneIds = new HashSet<>();
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
List<LaneProto.LaneGroup> groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
for (LaneProto.LaneGroup laneGroup : groups) {
for (LaneProto.LaneRule laneRule : laneGroup.getRulesList()) {
// in tsf, if namespace id and application id are in lane rule, it means the service is in lane.
if (!laneGroup.getMetadataMap().containsKey(tsfNamespaceId + "," + tsfApplicationId)) {
continue;
}
// in tsf, lane label key is TsfMetadataConstants.TSF_GROUP_ID
if (!TsfMetadataConstants.TSF_GROUP_ID.equals(laneRule.getLabelKey())
|| StringUtils.isEmpty(laneRule.getDefaultLabelValue())) {
continue;
}
for (String groupId : laneRule.getDefaultLabelValue().split(",")) {
if (activeGroupSet.contains(groupId)) {
// active group, update lane active status
currentLaneActiveMap.put(laneRule.getId(), true);
}
else {
// inactive group, mark lane as inactive only if no other active group exists
currentLaneActiveMap.putIfAbsent(laneRule.getId(), false);
}
if (StringUtils.equals(groupId, tsfGroupId)) {
tempCurrentGroupLaneIds.add(laneRule.getId());
}
}
}
}
laneActiveMap = currentLaneActiveMap;
currentGroupLaneIds = tempCurrentGroupLaneIds;
}
public boolean isLaneExist(String laneId) {
return laneActiveMap.containsKey(laneId);
}
public boolean isActiveLane(String laneId) {
return laneActiveMap.getOrDefault(laneId, false);
}
public Set<String> getCurrentGroupLaneIds() {
return currentGroupLaneIds;
}
@Override
public boolean ifConsume(String originMessageLaneId, MqLaneProperties mqLaneProperties) {
String laneId = originMessageLaneId;
if (laneId != null && laneId.contains("/")) {
laneId = laneId.split("/")[1];
}
Set<String> groupLaneIdSet = getCurrentGroupLaneIds();
// message has no lane id
if (StringUtils.isEmpty(laneId)) {
if (groupLaneIdSet.isEmpty()) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return mqLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(originMessageLaneId);
// message has lane id
if (groupLaneIdSet.isEmpty()) {
// baseline service
// message carries lane id but the current service's lane has no deployment groups, consume baseline
boolean consume = !isLaneExist(laneId);
// message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume
consume = consume ||
(mqLaneProperties.getMainConsumeLane() &&
isLaneExist(laneId) &&
!isActiveLane(laneId)
);
return consume;
}
else {
return groupLaneIdSet.contains(laneId);
}
}
}
@Override
public String getLaneHeaderKey() {
return TSF_LANE_ID;
}
@Override
public String formatLaneId(String laneId) {
if (StringUtils.isEmpty(laneId)) {
return laneId;
}
if (!laneId.contains("/") && laneId.startsWith("lane-")) {
laneId = "tsf/" + laneId;
}
return laneId;
}
}

@ -0,0 +1,168 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link LaneRuleListener}.
*/
public class LaneRuleListenerTest {
private TsfActiveLane tsfActiveLane;
private LaneRuleListener laneRuleListener;
@BeforeEach
public void setUp() {
tsfActiveLane = mock(TsfActiveLane.class);
laneRuleListener = new LaneRuleListener(tsfActiveLane::freshLaneStatus);
}
@Test
public void testConstructorInitialization() {
// Verify that constructor properly initializes the TsfActiveLane dependency
assert laneRuleListener != null;
}
@Test
public void testOnResourceAddWithLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
laneRuleListener.onResourceAdd(svcEventKey, newValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
}
@Test
public void testOnResourceAddWithNonLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.INSTANCE);
// When
laneRuleListener.onResourceAdd(svcEventKey, newValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
}
@Test
public void testOnResourceUpdatedWithLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
laneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
}
@Test
public void testOnResourceUpdatedWithNonLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.SERVICE);
// When
laneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
}
@Test
public void testOnResourceDeletedWithLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
laneRuleListener.onResourceDeleted(svcEventKey, oldValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
}
@Test
public void testOnResourceDeletedWithNonLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.ROUTING);
// When
laneRuleListener.onResourceDeleted(svcEventKey, oldValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
}
@Test
public void testEventTypeCoverage() {
// Test all event types to ensure proper filtering
for (ServiceEventKey.EventType eventType : ServiceEventKey.EventType.values()) {
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue value = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(eventType);
// When
laneRuleListener.onResourceAdd(svcEventKey, value);
// Then
if (eventType == ServiceEventKey.EventType.LANE_RULE) {
verify(tsfActiveLane).freshLaneStatus();
}
else {
verify(tsfActiveLane, never()).freshLaneStatus();
}
// Reset mock for next iteration
Mockito.reset(tsfActiveLane);
}
}
}

@ -0,0 +1,181 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.BaseLaneMode;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link PolarisActiveLane}.
* Instance and service status change.
*/
public class PolarisActiveLaneTest {
private static final String CURRENT_INSTANCE_ID = "current-instance-id";
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private PolarisSDKContextManager polarisSDKContextManager;
private Registration registration;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<JacksonUtils> jacksonUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
when(polarisSDKContextManager.getSDKContext()).thenReturn(mock(SDKContext.class));
registration = mock(Registration.class);
when(registration.getInstanceId()).thenReturn(CURRENT_INSTANCE_ID);
polarisActiveLane = new PolarisActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), registration);
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}");
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
jacksonUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testCallback() throws Throwable {
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "test-lane");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
// instance not in lane
metadata.remove("lane");
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// reset, instance in lane
metadata.put("lane", "test-lane");
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
// service not in lane
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())).thenReturn(Collections.emptyList());
polarisActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
}
@Test
public void testCallback_baseLaneMode1() throws Throwable {
Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode");
baseLaneModeField.setAccessible(true);
baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE);
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// not in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "lane-not-exist");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(rule.getDefaultLabelValue()).thenReturn("lane-exist");
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
}
@Test
public void testCallback_baseLaneMode2() throws Throwable {
Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode");
baseLaneModeField.setAccessible(true);
baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE);
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "lane-exist");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(rule.getDefaultLabelValue()).thenReturn("lane-exist");
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
}
}

@ -0,0 +1,176 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Test for {@link KafkaLaneAspectConfiguration}.
*/
public class KafkaLaneAspectConfigurationTest {
private ApplicationContextRunner contextRunner;
@BeforeEach
public void setUp() {
contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class))
.withUserConfiguration(MockConfiguration.class);
}
@AfterEach
public void tearDown() {
// Reset static state for clean tests using reflection
resetTsfContextUtilsStaticFields();
}
private void resetTsfContextUtilsStaticFields() {
try {
// Reset isOnlyTsfConsulEnabledFirstConfiguration
Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration");
isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null);
isOnlyTsfConsulEnabledFirstConfiguration.set(true);
// Reset isTsfConsulEnabledFirstConfiguration
Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration");
isTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null);
isTsfConsulEnabledFirstConfiguration.set(true);
// Reset tsfConsulEnabled
Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled");
tsfConsulEnabledField.setAccessible(true);
tsfConsulEnabledField.setBoolean(null, false);
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, false);
}
catch (Exception e) {
throw new RuntimeException("Failed to reset TsfContextUtils static fields", e);
}
}
@Test
public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() {
resetTsfContextUtilsStaticFields();
// Simulate KafkaTemplate class presence
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// KafkaLaneAspect should be created when KafkaTemplate is present
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
assertThat(aspect).isNotNull();
});
}
@Test
public void testTsfActiveLaneBeanNotCreatedWhenPolarisAddressPresent() {
resetTsfContextUtilsStaticFields();
// Simulate Polaris address present (not only TSF Consul)
contextRunner
.withPropertyValues(
"tsf_consul_enable=true",
"tsf_consul_ip=127.0.0.1",
"spring.cloud.polaris.address=127.0.0.1:8091"
)
.run(context -> {
// TsfActiveLane should NOT be created when Polaris address is present
assertThat(context).doesNotHaveBean(TsfActiveLane.class);
});
}
@Test
public void testKafkaLanePropertiesEnabled() {
resetTsfContextUtilsStaticFields();
contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true",
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties.getLaneOn()).isTrue();
assertThat(properties.getLaneConsumeMain()).isTrue();
assertThat(properties.getMainConsumeLane()).isTrue();
});
}
@Test
public void testBeanDependenciesInjection() {
resetTsfContextUtilsStaticFields();
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// Verify that all required dependencies are properly injected
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
// The aspect should have all required dependencies
assertThat(aspect).isNotNull();
// Verify that KafkaLaneProperties is properly configured
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties).isNotNull();
assertThat(properties.getLaneOn()).isTrue();
});
}
@Configuration
static class MockConfiguration {
@Bean
public PolarisSDKContextManager polarisSDKContextManager() {
return mock(PolarisSDKContextManager.class);
}
@Bean
public PolarisDiscoveryHandler polarisDiscoveryHandler() {
return mock(PolarisDiscoveryHandler.class);
}
@Bean
public Registration registration() {
return mock(Registration.class);
}
}
}

@ -0,0 +1,323 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}. Instance in lane.
*/
public class KafkaLaneAspectTest {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "test-lane"); // in lane
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, true);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-group/test-lane-name"; // valid lane id
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers()
.add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testIfConsumeWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isFalse(); // Because laneConsumeMain is false
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,333 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}. Instance and service not in lane.
*/
public class KafkaLaneAspectTest2 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, ""); // not in lane
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, false);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-group/test-lane-name"; // valid lane id
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
// change main consume lane
kafkaLaneProperties.setMainConsumeLane(true);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result");
//reset
kafkaLaneProperties.setMainConsumeLane(false);
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers()
.add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testIfConsumeWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // as baseline
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,172 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}.
* Instance and service status change.
*/
public class KafkaLaneAspectTest3 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
// not in lane
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "");
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, false);
String laneId = "test-group/test-lane-name"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume
// in lane
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
laneField.set(polarisActiveLane, "test-lane");
serviceInLaneField.setBoolean(polarisActiveLane, true);
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result");
}
@Test
public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable {
// in lane
String laneId = "test-group/test-lane-name"; // valid lane id
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "test-lane");
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, true);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo("result");
// not in lane
when(group.getRulesList()).thenReturn(Collections.emptyList());
laneField.set(polarisActiveLane, "");
serviceInLaneField.setBoolean(polarisActiveLane, false);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
}

@ -0,0 +1,108 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test for {@link KafkaLaneProperties}.
*/
public class KafkaLanePropertiesTest {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestConfiguration.class);
@Test
public void testDefaultValues() {
this.contextRunner.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
// test default values
assertThat(properties.getLaneOn()).isFalse();
assertThat(properties.getLaneConsumeMain()).isFalse();
assertThat(properties.getMainConsumeLane()).isFalse();
});
}
@Test
public void testConfigurationPropertiesPrefix() {
this.contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true",
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
// test properties
assertThat(properties.getLaneOn()).isTrue();
assertThat(properties.getLaneConsumeMain()).isTrue();
assertThat(properties.getMainConsumeLane()).isTrue();
});
}
@Test
public void testGetterAndSetter() {
KafkaLaneProperties properties = new KafkaLaneProperties();
// test laneOn property
properties.setLaneOn(true);
assertThat(properties.getLaneOn()).isTrue();
properties.setLaneOn(false);
assertThat(properties.getLaneOn()).isFalse();
// test laneConsumeMain property
properties.setLaneConsumeMain(true);
assertThat(properties.getLaneConsumeMain()).isTrue();
properties.setLaneConsumeMain(false);
assertThat(properties.getLaneConsumeMain()).isFalse();
// test mainConsumeLane property
properties.setMainConsumeLane(true);
assertThat(properties.getMainConsumeLane()).isTrue();
properties.setMainConsumeLane(false);
assertThat(properties.getMainConsumeLane()).isFalse();
}
@Test
public void testPartialConfiguration() {
this.contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
// test laneOn property
assertThat(properties.getLaneOn()).isTrue();
// test other properties keep default values
assertThat(properties.getLaneConsumeMain()).isFalse();
assertThat(properties.getMainConsumeLane()).isFalse();
});
}
@EnableConfigurationProperties(KafkaLaneProperties.class)
static class TestConfiguration {
}
}

@ -0,0 +1,142 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Test for {@link KafkaLaneAspectConfiguration}.
*/
public class TsfKafkaLaneAspectConfigurationTest {
private ApplicationContextRunner contextRunner;
@BeforeEach
public void setUp() throws Exception {
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, true);
contextRunner = new ApplicationContextRunner()
.withPropertyValues(
"tsf_consul_enable=true",
"tsf_consul_ip=127.0.0.1",
"spring.cloud.polaris.address=" // Empty to simulate only TSF Consul
)
.withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class))
.withUserConfiguration(MockConfiguration.class);
}
@Test
public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() {
// Simulate KafkaTemplate class presence
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// KafkaLaneAspect should be created when KafkaTemplate is present
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
assertThat(aspect).isNotNull();
});
}
@Test
public void testKafkaLanePropertiesEnabled() {
contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true",
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties.getLaneOn()).isTrue();
assertThat(properties.getLaneConsumeMain()).isTrue();
assertThat(properties.getMainConsumeLane()).isTrue();
});
}
@Test
public void testBeanDependenciesInjection() {
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// Verify that all required dependencies are properly injected
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
// The aspect should have all required dependencies
assertThat(aspect).isNotNull();
// Verify that KafkaLaneProperties is properly configured
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties).isNotNull();
assertThat(properties.getLaneOn()).isTrue();
});
}
@Test
public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() {
// Simulate Only TSF Consul enabled condition
contextRunner
.run(context -> {
// TsfActiveLane should be created when only TSF Consul is enabled
assertThat(context).hasSingleBean(TsfActiveLane.class);
TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class);
assertThat(tsfActiveLane).isNotNull();
});
}
@Configuration
static class MockConfiguration {
@Bean
public PolarisSDKContextManager polarisSDKContextManager() {
return mock(PolarisSDKContextManager.class);
}
@Bean
public PolarisDiscoveryHandler polarisDiscoveryHandler() {
return mock(PolarisDiscoveryHandler.class);
}
@Bean
public Registration registration() {
return mock(Registration.class);
}
}
}

@ -0,0 +1,314 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance in lane.
*/
public class TsfKafkaLaneAspectTest {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);
// reset currentGroupLaneIds, instance in lane
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane1")));
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-test")));
String laneId = "tsf/lane-test"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo("tsf/" + expectedLaneId);
}
@Test
public void testIfConsumeWithNoLaneId() throws Exception {
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Arrays.asList("lane1", "lane2")));
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isFalse(); // Because laneConsumeMain is false
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,347 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance not in lane.
*/
public class TsfKafkaLaneAspectTest2 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);
// reset currentGroupLaneIds
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>());
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-test", true);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
String laneId = "tsf/lane-test"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist2() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-test", false);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
String laneId = "tsf/lane-test"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
kafkaLaneProperties.setMainConsumeLane(true);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result"); // as mainConsumeLane is true
// reset mainConsumeLane
kafkaLaneProperties.setMainConsumeLane(false);
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo("tsf/" + expectedLaneId);
}
/**
* instance not in lane, act as baseline service.
*/
@Test
public void testIfConsumeWithNoLaneId() throws Exception {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // baseline service
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,149 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}.
* Instance and service status change.
*/
public class TsfKafkaLaneAspectTest3 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
tsfActiveLane = new TsfActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
// not in lane
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-name", true);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
String laneId = "tsf/lane-name"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume
// in lane
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name")));
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result");
}
@Test
public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable {
// in lane
String laneId = "tsf/lane-name"; // valid lane id
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name")));
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-name", true);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo("result");
// not in lane
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>());
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
}

@ -0,0 +1,313 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.tsf;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.metadata.core.constant.TsfMetadataConstants;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link TsfActiveLane}.
*/
public class TsfActiveLaneTest {
private TsfActiveLane tsfActiveLane;
private PolarisSDKContextManager polarisSDKContextManager;
private PolarisDiscoveryHandler discoveryClient;
private KafkaLaneProperties kafkaLaneProperties;
private SDKContext sdkContext;
private Extensions extensions;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<JacksonUtils> jacksonUtilsMockedStatic;
@BeforeEach
public void setUp() {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
discoveryClient = mock(PolarisDiscoveryHandler.class);
sdkContext = mock(SDKContext.class);
extensions = mock(Extensions.class);
kafkaLaneProperties = new KafkaLaneProperties();
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class);
jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}");
when(polarisSDKContextManager.getSDKContext()).thenReturn(sdkContext);
when(sdkContext.getExtensions()).thenReturn(extensions);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient);
// Set up field values using reflection
ReflectionTestUtils.setField(tsfActiveLane, "tsfNamespaceId", "test-namespace");
ReflectionTestUtils.setField(tsfActiveLane, "tsfGroupId", "test-group");
ReflectionTestUtils.setField(tsfActiveLane, "tsfApplicationId", "test-app");
ReflectionTestUtils.setField(tsfActiveLane, "springApplicationName", "test-service");
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
jacksonUtilsMockedStatic.close();
}
@Test
public void testCallbackWithEmptyInstances() {
// Given
List<Instance> currentInstances = Collections.emptyList();
// When
tsfActiveLane.callback(currentInstances);
// Then
// Should not throw any exceptions and handle empty instances gracefully
}
@Test
public void testFreshLaneStatusWithActiveGroups() {
// Given
Set<String> activeGroups = new HashSet<>(Arrays.asList("group1", "group2"));
ReflectionTestUtils.setField(tsfActiveLane, "activeGroupSet", activeGroups);
// Mock lane groups and rules
LaneProto.LaneGroup laneGroup = mock(LaneProto.LaneGroup.class);
LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class);
when(laneGroup.getMetadataMap()).thenReturn(createMetadataMap("test-namespace,test-app"));
when(laneGroup.getRulesList()).thenReturn(Collections.singletonList(laneRule));
when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("test-group,group2,group3");
when(laneRule.getId()).thenReturn("lane1");
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(ServiceKey.class), any(Extensions.class)))
.thenReturn(Collections.singletonList(laneGroup));
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshLaneStatus");
// Then
Map<String, Boolean> laneActiveMap = (Map<String, Boolean>) ReflectionTestUtils.getField(tsfActiveLane, "laneActiveMap");
Set<String> currentGroupLaneIds = (Set<String>) ReflectionTestUtils.getField(tsfActiveLane, "currentGroupLaneIds");
assertThat(laneActiveMap).containsEntry("lane1", true);
assertThat(currentGroupLaneIds).contains("lane1");
}
@Test
public void testFreshLaneStatusWithInactiveGroups() {
// Given
Set<String> activeGroups = new HashSet<>(Collections.singletonList("group1"));
ReflectionTestUtils.setField(tsfActiveLane, "activeGroupSet", activeGroups);
// Mock lane groups and rules
LaneProto.LaneGroup laneGroup = mock(LaneProto.LaneGroup.class);
LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class);
when(laneGroup.getMetadataMap()).thenReturn(createMetadataMap("test-namespace,test-app"));
when(laneGroup.getRulesList()).thenReturn(Collections.singletonList(laneRule));
when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("group2,group3");
when(laneRule.getId()).thenReturn("lane1");
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(ServiceKey.class), any(Extensions.class)))
.thenReturn(Collections.singletonList(laneGroup));
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshLaneStatus");
// Then
Map<String, Boolean> laneActiveMap = (Map<String, Boolean>) ReflectionTestUtils.getField(tsfActiveLane, "laneActiveMap");
assertThat(laneActiveMap).containsEntry("lane1", false);
}
@Test
public void testIsLaneExist() {
// Given
Map<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane1", true);
laneActiveMap.put("lane2", false);
ReflectionTestUtils.setField(tsfActiveLane, "laneActiveMap", laneActiveMap);
// When & Then
assertThat(tsfActiveLane.isLaneExist("lane1")).isTrue();
assertThat(tsfActiveLane.isLaneExist("lane2")).isTrue();
assertThat(tsfActiveLane.isLaneExist("lane3")).isFalse();
}
@Test
public void testIsActiveLane() {
// Given
Map<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane1", true);
laneActiveMap.put("lane2", false);
ReflectionTestUtils.setField(tsfActiveLane, "laneActiveMap", laneActiveMap);
// When & Then
assertThat(tsfActiveLane.isActiveLane("lane1")).isTrue();
assertThat(tsfActiveLane.isActiveLane("lane2")).isFalse();
assertThat(tsfActiveLane.isActiveLane("lane3")).isFalse();
}
@Test
public void testGetCurrentGroupLaneIds() {
// Given
Set<String> currentGroupLaneIds = new TreeSet<>(Arrays.asList("lane1", "lane2"));
ReflectionTestUtils.setField(tsfActiveLane, "currentGroupLaneIds", currentGroupLaneIds);
// When
Set<String> result = tsfActiveLane.getCurrentGroupLaneIds();
// Then
assertThat(result).containsExactly("lane1", "lane2");
}
@Test
public void testFreshWhenInstancesChangeWithEmptyList() {
// Given
List<Instance> emptyInstances = Collections.emptyList();
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshWhenInstancesChange", emptyInstances);
// Then
// Should handle empty list gracefully without exceptions
}
@Test
public void testFreshWhenInstancesChangeWithValidInstances() {
// Given
Instance instance = mock(Instance.class);
when(instance.getMetadata()).thenReturn(createMetadata("test-namespace", "group1", "test-app"));
List<Instance> instances = Collections.singletonList(instance);
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshWhenInstancesChange", instances);
// Then
Set<String> activeGroupSet = (Set<String>) ReflectionTestUtils.getField(tsfActiveLane, "activeGroupSet");
assertThat(activeGroupSet).contains("group1");
}
@Test
public void testCallback() throws Throwable {
Field tsfNamespaceIdField = TsfActiveLane.class.getDeclaredField("tsfNamespaceId");
tsfNamespaceIdField.setAccessible(true);
tsfNamespaceIdField.set(tsfActiveLane, "ns1");
Field tsfGroupIdField = TsfActiveLane.class.getDeclaredField("tsfGroupId");
tsfGroupIdField.setAccessible(true);
tsfGroupIdField.set(tsfActiveLane, "group1");
Field tsfApplicationIdField = TsfActiveLane.class.getDeclaredField("tsfApplicationId");
tsfApplicationIdField.setAccessible(true);
tsfApplicationIdField.set(tsfActiveLane, "app1");
// not in lane
assertThat(tsfActiveLane.getCurrentGroupLaneIds()).isEmpty();
// in lane
// given
LaneProto.LaneGroup group = mock(LaneProto.LaneGroup.class);
LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class);
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("ns1,app1", TsfMetadataConstants.TSF_NAMESPACE_ID + "," + TsfMetadataConstants.TSF_APPLICATION_ID);
when(group.getMetadataMap()).thenReturn(metadataMap);
when(group.getRulesList()).thenReturn(Collections.singletonList(laneRule));
when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("group1");
when(laneRule.getId()).thenReturn("lane1");
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
Map<String, String> metadata = new HashMap<>();
metadata.put("TSF_NAMESPACE_ID", "ns1");
metadata.put("TSF_GROUP_ID", "group1");
metadata.put("TSF_APPLICATION_ID", "app1");
Instance instance = mock(Instance.class);
when(instance.getMetadata()).thenReturn(metadata);
// act
tsfActiveLane.callback(Collections.singletonList(instance));
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isTrue();
// instance not in lane, change the rule
when(laneRule.getDefaultLabelValue()).thenReturn("group2");
tsfActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isFalse();
// reset, instance in lane
when(laneRule.getDefaultLabelValue()).thenReturn("group1");
tsfActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isTrue();
// service not in lane
metadataMap.clear();
metadataMap.put("ns1,app2", TsfMetadataConstants.TSF_NAMESPACE_ID + "," + TsfMetadataConstants.TSF_APPLICATION_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("group2");
tsfActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isFalse();
}
private Map<String, String> createMetadata(String namespaceId, String groupId, String applicationId) {
Map<String, String> metadata = new HashMap<>();
metadata.put("TSF_NAMESPACE_ID", namespaceId);
metadata.put("TSF_GROUP_ID", groupId);
metadata.put("TSF_APPLICATION_ID", applicationId);
return metadata;
}
private Map<String, String> createMetadataMap(String value) {
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("test-namespace,test-app", value);
return metadataMap;
}
}
Loading…
Cancel
Save