feat: support kafka lane

pull/1765/head
shedfreewu 4 weeks ago
parent 13ee696b9f
commit eb2ed0f7d4

@ -101,6 +101,7 @@ public class ServiceInstanceChangeCallbackManager implements ApplicationListener
if (clz.isAnnotationPresent(ServiceInstanceChangeListener.class)) {
ServiceInstanceChangeListener serviceInstanceChangeListener = clz.getAnnotation(ServiceInstanceChangeListener.class);
serviceName = serviceInstanceChangeListener.serviceName();
serviceName = ApplicationContextAwareUtils.getApplicationContext().getEnvironment().resolvePlaceholders(serviceName);
}
if (StringUtils.isBlank(serviceName)) {

@ -240,6 +240,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-kafka-lane-plugin</artifactId>
<version>${revision}</version>
</dependency>
<!-- third part framework dependencies -->
<dependency>
<groupId>org.springdoc</groupId>

@ -26,6 +26,7 @@
<module>spring-cloud-starter-tencent-multi-discovery-plugin</module>
<module>spring-cloud-starter-tencent-traffic-mirroring-plugin</module>
<module>spring-cloud-starter-tencent-fault-injection-plugin</module>
<module>spring-cloud-starter-tencent-kafka-lane-plugin</module>
</modules>
</project>

@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-tencent-plugin-starters</artifactId>
<groupId>com.tencent.cloud</groupId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-starter-tencent-kafka-lane-plugin</artifactId>
<name>Spring Cloud Starter Tencent Kafka Lane plugin</name>
<dependencies>
<dependency>
<groupId>com.tencent.polaris</groupId>
<artifactId>polaris-all</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-tencent-commons</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-tencent-polaris-context</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.cloud</groupId>
<artifactId>spring-cloud-starter-tencent-polaris-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -0,0 +1,367 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.LaneRouter;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
@Aspect
@Order(1)
public class KafkaLaneAspect {
private static final Logger LOG = LoggerFactory.getLogger(KafkaLaneAspect.class);
/**
* Empty object.
*/
public static final Object EMPTY_OBJECT = new Object();
private static final String TSF_LANE_ID = "tsf_laneId";
private final PolarisSDKContextManager polarisSDKContextManager;
private final KafkaLaneProperties kafkaLaneProperties;
private final TsfActiveLane tsfActiveLane;
private final String laneHeaderKey;
@Value("${spring.cloud.tencent.metadata.content.lane:}")
private String lane;
public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, TsfActiveLane tsfActiveLane) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.kafkaLaneProperties = kafkaLaneProperties;
this.tsfActiveLane = tsfActiveLane;
laneHeaderKey = TsfContextUtils.isOnlyTsfConsulEnabled() ? TSF_LANE_ID : MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL;
}
@Pointcut("execution(* org.springframework.kafka.core.KafkaTemplate.send(..))")
private void producerPointcut() {
}
@Around("producerPointcut()")
public Object aroundProducerMessage(ProceedingJoinPoint pjp) throws Throwable {
Object[] args = pjp.getArgs();
KafkaTemplate target = (KafkaTemplate) pjp.getTarget();
if (!this.kafkaLaneProperties.getLaneOn()) {
return pjp.proceed(args);
}
String laneId = LaneUtils.fetchLaneByCaller(
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext).map(SDKContext::getExtensions).orElse(null),
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
if (StringUtils.isBlank(laneId) || !kafkaLaneProperties.getAutoSetHeader()) {
return pjp.proceed(args);
}
LOG.debug("kafka producer lane before, args: {}, thread laneId: {}", args, laneId);
try {
ProducerRecord producerRecord;
if (args.length == 1) {
// ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message);
if (args[0] instanceof Message) {
Message message = (Message) args[0];
producerRecord = target.getMessageConverter().fromMessage(message, target.getDefaultTopic());
// possibly no Jackson
if (!producerRecord.headers().iterator().hasNext()) {
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
if (correlationId != null) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
}
else {
producerRecord = (ProducerRecord) args[0];
}
}
else if (args.length == 2) {
// ListenableFuture<SendResult<K, V>> send(String topic, V data);
producerRecord = new ProducerRecord<>((String) args[0], args[1]);
}
else if (args.length == 3) {
// ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
producerRecord = new ProducerRecord<>((String) args[0], args[1], args[2]);
}
else if (args.length == 4) {
// ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
producerRecord = new ProducerRecord<>((String) args[0], (Integer) args[1], args[2], args[3]);
}
else if (args.length == 5) {
// ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
producerRecord = new ProducerRecord<>((String) args[0], (Integer) args[1], (Long) args[2], args[3], args[4]);
}
else {
LOG.error("KafkaTemplate send message with wrong args: {}", args);
return pjp.proceed(args);
}
Header laneHeader = new RecordHeader(laneHeaderKey, laneId.getBytes(StandardCharsets.UTF_8));
producerRecord.headers().add(laneHeader);
LOG.debug("kafka producer lane after, args: {}, laneId: {}", producerRecord, laneId);
return target.send(producerRecord);
}
catch (Exception e) {
LOG.error("add laneId to kafka message error", e);
}
return pjp.proceed(args);
}
@Pointcut("@annotation(org.springframework.kafka.annotation.KafkaListener)")
private void consumerPointcut() {
}
@Around("consumerPointcut()")
public Object aroundConsumerMessage(ProceedingJoinPoint pjp) throws Throwable {
Object[] args = pjp.getArgs();
if (!this.kafkaLaneProperties.getLaneOn()) {
return pjp.proceed(args);
}
// init metadata context
MetadataContextHolder.get();
try {
ConsumerRecord consumerRecord = null;
List messageList = null;
int dataPosition = -1;
Acknowledgment acknowledgment = null;
for (int i = 0; i < args.length; i++) {
if ((args[i] instanceof Acknowledgment)) {
acknowledgment = (Acknowledgment) args[i];
}
else if ((args[i] instanceof ConsumerRecord)) {
consumerRecord = (ConsumerRecord) args[i];
dataPosition = i;
}
else if (args[i] instanceof List) {
messageList = (List) args[i];
dataPosition = i;
}
}
// parameter is message list, for batch consume
if (messageList != null) {
// empty list directly return
if (messageList.isEmpty()) {
return pjp.proceed(args);
}
// parameter is not consumerRecord, consume directly
if (!(messageList.get(0) instanceof ConsumerRecord)) {
return pjp.proceed(args);
}
List<ConsumerRecord> newMessageList = new ArrayList<>();
for (Object item : messageList) {
ConsumerRecord record = (ConsumerRecord) item;
String laneId = this.getConsumerRecordLaneId(record);
boolean ifConsume = this.ifConsume(laneId);
if (ifConsume) {
newMessageList.add(record);
}
else {
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
if (LOG.isDebugEnabled()) {
LOG.debug("not need consume, laneId: {}, message:{}", laneId, record);
}
}
}
args[dataPosition] = newMessageList;
}
// parameter is consumerRecord
if (consumerRecord != null) {
String laneId = this.getConsumerRecordLaneId(consumerRecord);
boolean ifConsume = this.ifConsume(laneId);
if (!ifConsume) {
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
if (LOG.isDebugEnabled()) {
LOG.debug("not need consume, laneId: {}, message:{}", laneId, consumerRecord);
}
return EMPTY_OBJECT;
}
}
}
catch (Exception e) {
LOG.error("extract laneId from kafka message error", e);
}
Object result = pjp.proceed(args);
LaneUtils.removeCallerLaneId();
return result;
}
String getConsumerRecordLaneId(ConsumerRecord consumerRecord) {
String laneId = null;
Iterator<Header> iterator = consumerRecord.headers().headers(laneHeaderKey).iterator();
if (iterator.hasNext()) {
laneId = new String(iterator.next().value(), StandardCharsets.UTF_8);
}
// falls back to the laneId from the metadata context (set by the user's aspect) if the header is empty
if (StringUtils.isBlank(laneId)) {
laneId = LaneUtils.getCallerLaneId();
// format laneId
if (laneId != null && !laneId.contains("/") && laneId.startsWith("lane-")) {
laneId = "tsf/" + laneId;
}
}
return laneId;
}
/**
* whether the message is consumed by the current listener.
* @param messageLaneId message lane id.
* @return whether to consume.
*/
boolean ifConsume(String messageLaneId) {
if (TsfContextUtils.isOnlyTsfConsulEnabled() && tsfActiveLane != null) {
return ifConsumeInTsf(messageLaneId);
}
else {
return ifConsumeInPolaris(messageLaneId);
}
}
private boolean ifConsumeInTsf(String originMessageLaneId) {
String laneId = originMessageLaneId;
if (laneId != null && laneId.contains("/")) {
laneId = laneId.split("/")[1];
}
Set<String> groupLaneIdSet = tsfActiveLane.getCurrentGroupLaneIds();
// message has no lane id
if (StringUtils.isEmpty(laneId)) {
if (groupLaneIdSet.isEmpty()) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return this.kafkaLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(originMessageLaneId);
// message has lane id
if (groupLaneIdSet.isEmpty()) {
// baseline service
// message carries lane id but the current service's lane has no deployment groups, consume baseline
boolean consume = !tsfActiveLane.isLaneExist(laneId);
// message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume
consume = consume ||
(this.kafkaLaneProperties.getMainConsumeLane() &&
tsfActiveLane.isLaneExist(laneId) &&
!tsfActiveLane.isActiveLane(laneId)
);
return consume;
}
else {
return groupLaneIdSet.contains(laneId);
}
}
}
private boolean ifConsumeInPolaris(String messageLaneId) {
// message has no lane id
if (StringUtils.isEmpty(messageLaneId)) {
if (StringUtils.isEmpty(lane)) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return this.kafkaLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(messageLaneId);
// message has lane id
if (StringUtils.isEmpty(lane)) {
// baseline service
return this.kafkaLaneProperties.getMainConsumeLane();
}
else {
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
Collection<LaneProto.LaneGroup> groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
// whether the message lane id is the same as the lane id of the listener
for (LaneProto.LaneGroup group : groups) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule))
&& StringUtils.equals(rule.getDefaultLabelValue(), lane)) {
return true;
}
}
}
return false;
}
}
}
}

@ -0,0 +1,48 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka;
import com.tencent.cloud.common.tsf.ConditionalOnOnlyTsfConsulEnabled;
import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KafkaLaneProperties.class)
public class KafkaLaneAspectConfiguration {
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
public KafkaLaneAspect kafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager,
KafkaLaneProperties kafkaLaneProperties, @Autowired(required = false) TsfActiveLane tsfActiveLane) {
return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
@ConditionalOnOnlyTsfConsulEnabled
public TsfActiveLane tsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) {
return new TsfActiveLane(polarisSDKContextManager, discoveryClient);
}
}

@ -0,0 +1,80 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.polaris.lane.kafka")
public class KafkaLaneProperties {
/**
* enable kafka lane.
*/
private Boolean laneOn = false;
/**
* lane listener whether to consume main message.
* scene: message without lane label, listener service with lane label, force lane service to consume main message.
* default false.
*/
private Boolean laneConsumeMain = false;
/**
* main listener whether to consume lane message.
* scene: message with lane label, lane not deployed or not online, main consume lane message.
* default false.
*/
private Boolean mainConsumeLane = false;
/**
* whether to set lane id to message header.
*/
private Boolean autoSetHeader = true;
public Boolean getLaneOn() {
return laneOn;
}
public void setLaneOn(Boolean laneOn) {
this.laneOn = laneOn;
}
public Boolean getLaneConsumeMain() {
return laneConsumeMain;
}
public void setLaneConsumeMain(Boolean laneConsumeMain) {
this.laneConsumeMain = laneConsumeMain;
}
public Boolean getMainConsumeLane() {
return mainConsumeLane;
}
public void setMainConsumeLane(Boolean mainConsumeLane) {
this.mainConsumeLane = mainConsumeLane;
}
public Boolean getAutoSetHeader() {
return autoSetHeader;
}
public void setAutoSetHeader(Boolean autoSetHeader) {
this.autoSetHeader = autoSetHeader;
}
}

@ -0,0 +1,186 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka.tsf;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeCallback;
import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeListener;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.metadata.core.constant.TsfMetadataConstants;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
@ServiceInstanceChangeListener(serviceName = "${spring.application.name}")
public class TsfActiveLane implements ServiceInstanceChangeCallback, InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(TsfActiveLane.class);
private final PolarisSDKContextManager polarisSDKContextManager;
private final PolarisDiscoveryHandler discoveryClient;
/**
* Online deployment groups for this service (same namespace id and application id required).
*/
private volatile Set<String> activeGroupSet = new HashSet<>();
private volatile Set<String> currentGroupLaneIds = null;
/**
* key: laneId.
* value: true - online, false - offline.
*/
private volatile Map<String, Boolean> laneActiveMap = new HashMap<>();
@Value("${tsf_namespace_id:}")
private String tsfNamespaceId;
@Value("${tsf_group_id:}")
private String tsfGroupId;
@Value("${tsf_application_id:}")
private String tsfApplicationId;
@Value("${spring.application.name:}")
private String springApplicationName;
public TsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.discoveryClient = discoveryClient;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new TsfLaneRuleListener(this)));
}
@Override
public void afterPropertiesSet() {
// get instances to trigger callback when instances change
discoveryClient.getHealthyInstances(springApplicationName);
}
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
if (LOG.isDebugEnabled()) {
LOG.debug("ConsulServiceChangeCallback currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
LOG.debug("current namespaceId: {}, groupId: {}, applicationId: {}", tsfNamespaceId, tsfGroupId, tsfApplicationId);
}
freshWhenInstancesChange(currentServiceInstances);
if (LOG.isDebugEnabled()) {
LOG.info("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap));
}
}
private void freshWhenInstancesChange(List<Instance> currentServices) {
if (currentServices == null || currentServices.isEmpty()) {
return;
}
Set<String> currentActiveGroupSet = new HashSet<>();
// get all active groups
for (Instance healthService : currentServices) {
String nsId = healthService.getMetadata().get("TSF_NAMESPACE_ID");
String groupId = healthService.getMetadata().get("TSF_GROUP_ID");
String applicationId = healthService.getMetadata().get("TSF_APPLICATION_ID");
if (tsfNamespaceId.equals(nsId) && tsfApplicationId.equals(applicationId) && StringUtils.isNotEmpty(groupId)) {
currentActiveGroupSet.add(groupId);
}
}
activeGroupSet = currentActiveGroupSet;
freshLaneStatus();
}
/**
* update lane status.
*/
public void freshLaneStatus() {
Map<String, Boolean> currentLaneActiveMap = new HashMap<>();
Set<String> tempCurrentGroupLaneIds = new HashSet<>();
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
List<LaneProto.LaneGroup> groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
for (LaneProto.LaneGroup laneGroup : groups) {
for (LaneProto.LaneRule laneRule : laneGroup.getRulesList()) {
// in tsf, if namespace id and application id are in lane rule, it means the service is in lane.
if (!laneGroup.getMetadataMap().containsKey(tsfNamespaceId + "," + tsfApplicationId)) {
continue;
}
// in tsf, lane label key is TsfMetadataConstants.TSF_GROUP_ID
if (!TsfMetadataConstants.TSF_GROUP_ID.equals(laneRule.getLabelKey())
|| StringUtils.isEmpty(laneRule.getDefaultLabelValue())) {
continue;
}
for (String groupId : laneRule.getDefaultLabelValue().split(",")) {
if (activeGroupSet.contains(groupId)) {
// active group, update lane active status
currentLaneActiveMap.put(laneRule.getId(), true);
}
else {
// inactive group, mark lane as inactive only if no other active group exists
currentLaneActiveMap.putIfAbsent(laneRule.getId(), false);
}
if (StringUtils.equals(groupId, tsfGroupId)) {
tempCurrentGroupLaneIds.add(laneRule.getId());
}
}
}
}
laneActiveMap = currentLaneActiveMap;
currentGroupLaneIds = tempCurrentGroupLaneIds;
}
public boolean isLaneExist(String laneId) {
return laneActiveMap.containsKey(laneId);
}
public boolean isActiveLane(String laneId) {
return laneActiveMap.getOrDefault(laneId, false);
}
public Set<String> getCurrentGroupLaneIds() {
return currentGroupLaneIds;
}
}

@ -0,0 +1,58 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka.tsf;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
public class TsfLaneRuleListener extends AbstractResourceEventListener {
private final TsfActiveLane tsfActiveLane;
public TsfLaneRuleListener(TsfActiveLane tsfActiveLane) {
this.tsfActiveLane = tsfActiveLane;
}
@Override
public void onResourceAdd(ServiceEventKey svcEventKey, RegistryCacheValue newValue) {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
tsfActiveLane.freshLaneStatus();
}
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
tsfActiveLane.freshLaneStatus();
}
@Override
public void onResourceDeleted(ServiceEventKey svcEventKey, RegistryCacheValue oldValue) {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
tsfActiveLane.freshLaneStatus();
}
}

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.tencent.cloud.plugin.kafka.KafkaLaneAspectConfiguration

@ -0,0 +1,189 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicBoolean;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Test for {@link KafkaLaneAspectConfiguration}.
*/
public class KafkaLaneAspectConfigurationTest {
private ApplicationContextRunner contextRunner;
@BeforeEach
public void setUp() {
contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class))
.withUserConfiguration(MockConfiguration.class);
}
@AfterEach
public void tearDown() {
// Reset static state for clean tests using reflection
resetTsfContextUtilsStaticFields();
}
private void resetTsfContextUtilsStaticFields() {
try {
// Reset isOnlyTsfConsulEnabledFirstConfiguration
Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration");
isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null);
isOnlyTsfConsulEnabledFirstConfiguration.set(true);
// Reset isTsfConsulEnabledFirstConfiguration
Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration");
isTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null);
isTsfConsulEnabledFirstConfiguration.set(true);
// Reset tsfConsulEnabled
Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled");
tsfConsulEnabledField.setAccessible(true);
tsfConsulEnabledField.setBoolean(null, false);
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, false);
}
catch (Exception e) {
throw new RuntimeException("Failed to reset TsfContextUtils static fields", e);
}
}
@Test
public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() {
resetTsfContextUtilsStaticFields();
// Simulate KafkaTemplate class presence
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// KafkaLaneAspect should be created when KafkaTemplate is present
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
assertThat(aspect).isNotNull();
});
}
@Test
public void testTsfActiveLaneBeanNotCreatedWhenPolarisAddressPresent() {
resetTsfContextUtilsStaticFields();
// Simulate Polaris address present (not only TSF Consul)
contextRunner
.withPropertyValues(
"tsf_consul_enable=true",
"tsf_consul_ip=127.0.0.1",
"spring.cloud.polaris.address=127.0.0.1:8091"
)
.run(context -> {
// TsfActiveLane should NOT be created when Polaris address is present
assertThat(context).doesNotHaveBean(TsfActiveLane.class);
});
}
@Test
public void testKafkaLanePropertiesEnabled() {
resetTsfContextUtilsStaticFields();
contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true",
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties.getLaneOn()).isTrue();
assertThat(properties.getLaneConsumeMain()).isTrue();
assertThat(properties.getMainConsumeLane()).isTrue();
});
}
@Test
public void testBeanDependenciesInjection() {
resetTsfContextUtilsStaticFields();
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// Verify that all required dependencies are properly injected
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
// The aspect should have all required dependencies
assertThat(aspect).isNotNull();
// Verify that KafkaLaneProperties is properly configured
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties).isNotNull();
assertThat(properties.getLaneOn()).isTrue();
});
}
@Test
public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() {
resetTsfContextUtilsStaticFields();
// Simulate Only TSF Consul enabled condition
contextRunner
.withPropertyValues(
"tsf_consul_enable=true",
"tsf_consul_ip=127.0.0.1",
"spring.cloud.polaris.address=" // Empty to simulate only TSF Consul
)
.run(context -> {
// TsfActiveLane should be created when only TSF Consul is enabled
assertThat(context).hasSingleBean(TsfActiveLane.class);
TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class);
assertThat(tsfActiveLane).isNotNull();
});
}
@Configuration
static class MockConfiguration {
@Bean
public PolarisSDKContextManager polarisSDKContextManager() {
return mock(PolarisSDKContextManager.class);
}
@Bean
public PolarisDiscoveryHandler polarisDiscoveryHandler() {
return mock(PolarisDiscoveryHandler.class);
}
}
}

@ -0,0 +1,349 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}.
*/
public class KafkaLaneAspectTest {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = mock(TsfActiveLane.class);
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
resetTsfContextUtilsStaticFields();
}
private void resetTsfContextUtilsStaticFields() {
try {
// Reset isOnlyTsfConsulEnabledFirstConfiguration
Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration");
isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null);
isOnlyTsfConsulEnabledFirstConfiguration.set(true);
// Reset isTsfConsulEnabledFirstConfiguration
Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration");
isTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null);
isTsfConsulEnabledFirstConfiguration.set(true);
// Reset tsfConsulEnabled
Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled");
tsfConsulEnabledField.setAccessible(true);
tsfConsulEnabledField.setBoolean(null, false);
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, false);
}
catch (Exception e) {
throw new RuntimeException("Failed to reset TsfContextUtils static fields", e);
}
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers("tsf_laneId").iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add("tsf_laneId", laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers().add("tsf_laneId", expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo("tsf/" + expectedLaneId);
}
@Test
public void testIfConsumeInPolarisWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// Use reflection to set the lane field
try {
Field laneField = KafkaLaneAspect.class.getDeclaredField("lane");
laneField.setAccessible(true);
laneField.set(kafkaLaneAspect, "test-lane");
}
catch (Exception e) {
throw new RuntimeException(e);
}
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
}
@Test
public void testIfConsumeInTsfWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);
when(tsfActiveLane.getCurrentGroupLaneIds()).thenReturn(Collections.emptySet());
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add("tsf_laneId", "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add("tsf_laneId", "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,108 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test for {@link KafkaLaneProperties}.
*/
public class KafkaLanePropertiesTest {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestConfiguration.class);
@Test
public void testDefaultValues() {
this.contextRunner.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
// test default values
assertThat(properties.getLaneOn()).isFalse();
assertThat(properties.getLaneConsumeMain()).isFalse();
assertThat(properties.getMainConsumeLane()).isFalse();
});
}
@Test
public void testConfigurationPropertiesPrefix() {
this.contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true",
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
// test properties
assertThat(properties.getLaneOn()).isTrue();
assertThat(properties.getLaneConsumeMain()).isTrue();
assertThat(properties.getMainConsumeLane()).isTrue();
});
}
@Test
public void testGetterAndSetter() {
KafkaLaneProperties properties = new KafkaLaneProperties();
// test laneOn property
properties.setLaneOn(true);
assertThat(properties.getLaneOn()).isTrue();
properties.setLaneOn(false);
assertThat(properties.getLaneOn()).isFalse();
// test laneConsumeMain property
properties.setLaneConsumeMain(true);
assertThat(properties.getLaneConsumeMain()).isTrue();
properties.setLaneConsumeMain(false);
assertThat(properties.getLaneConsumeMain()).isFalse();
// test mainConsumeLane property
properties.setMainConsumeLane(true);
assertThat(properties.getMainConsumeLane()).isTrue();
properties.setMainConsumeLane(false);
assertThat(properties.getMainConsumeLane()).isFalse();
}
@Test
public void testPartialConfiguration() {
this.contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
// test laneOn property
assertThat(properties.getLaneOn()).isTrue();
// test other properties keep default values
assertThat(properties.getLaneConsumeMain()).isFalse();
assertThat(properties.getMainConsumeLane()).isFalse();
});
}
@EnableConfigurationProperties(KafkaLaneProperties.class)
static class TestConfiguration {
}
}

@ -0,0 +1,252 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka.tsf;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.metadata.core.constant.TsfMetadataConstants;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link TsfActiveLane}.
*/
public class TsfActiveLaneTest {
private TsfActiveLane tsfActiveLane;
private PolarisSDKContextManager polarisSDKContextManager;
private PolarisDiscoveryHandler discoveryClient;
private SDKContext sdkContext;
private Extensions extensions;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
@BeforeEach
public void setUp() {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
discoveryClient = mock(PolarisDiscoveryHandler.class);
sdkContext = mock(SDKContext.class);
extensions = mock(Extensions.class);
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
when(polarisSDKContextManager.getSDKContext()).thenReturn(sdkContext);
when(sdkContext.getExtensions()).thenReturn(extensions);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient);
// Set up field values using reflection
ReflectionTestUtils.setField(tsfActiveLane, "tsfNamespaceId", "test-namespace");
ReflectionTestUtils.setField(tsfActiveLane, "tsfGroupId", "test-group");
ReflectionTestUtils.setField(tsfActiveLane, "tsfApplicationId", "test-app");
ReflectionTestUtils.setField(tsfActiveLane, "springApplicationName", "test-service");
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
}
@Test
public void testConstructorInitialization() {
// Verify that constructor properly initializes dependencies
assertThat(tsfActiveLane).isNotNull();
verify(polarisSDKContextManager, times(1)).getSDKContext();
verify(sdkContext, times(1)).getExtensions();
}
@Test
public void testCallbackWithEmptyInstances() {
// Given
List<Instance> currentInstances = Collections.emptyList();
List<Instance> addInstances = Collections.emptyList();
List<Instance> deleteInstances = Collections.emptyList();
// When
tsfActiveLane.callback(currentInstances, addInstances, deleteInstances);
// Then
// Should not throw any exceptions and handle empty instances gracefully
}
@Test
public void testFreshLaneStatusWithActiveGroups() {
// Given
Set<String> activeGroups = new HashSet<>(Arrays.asList("group1", "group2"));
ReflectionTestUtils.setField(tsfActiveLane, "activeGroupSet", activeGroups);
// Mock lane groups and rules
LaneProto.LaneGroup laneGroup = mock(LaneProto.LaneGroup.class);
LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class);
when(laneGroup.getMetadataMap()).thenReturn(createMetadataMap("test-namespace,test-app"));
when(laneGroup.getRulesList()).thenReturn(Collections.singletonList(laneRule));
when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("test-group,group2,group3");
when(laneRule.getId()).thenReturn("lane1");
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(ServiceKey.class), any(Extensions.class)))
.thenReturn(Collections.singletonList(laneGroup));
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshLaneStatus");
// Then
Map<String, Boolean> laneActiveMap = (Map<String, Boolean>) ReflectionTestUtils.getField(tsfActiveLane, "laneActiveMap");
Set<String> currentGroupLaneIds = (Set<String>) ReflectionTestUtils.getField(tsfActiveLane, "currentGroupLaneIds");
assertThat(laneActiveMap).containsEntry("lane1", true);
assertThat(currentGroupLaneIds).contains("lane1");
}
@Test
public void testFreshLaneStatusWithInactiveGroups() {
// Given
Set<String> activeGroups = new HashSet<>(Collections.singletonList("group1"));
ReflectionTestUtils.setField(tsfActiveLane, "activeGroupSet", activeGroups);
// Mock lane groups and rules
LaneProto.LaneGroup laneGroup = mock(LaneProto.LaneGroup.class);
LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class);
when(laneGroup.getMetadataMap()).thenReturn(createMetadataMap("test-namespace,test-app"));
when(laneGroup.getRulesList()).thenReturn(Collections.singletonList(laneRule));
when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("group2,group3");
when(laneRule.getId()).thenReturn("lane1");
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(ServiceKey.class), any(Extensions.class)))
.thenReturn(Collections.singletonList(laneGroup));
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshLaneStatus");
// Then
Map<String, Boolean> laneActiveMap = (Map<String, Boolean>) ReflectionTestUtils.getField(tsfActiveLane, "laneActiveMap");
assertThat(laneActiveMap).containsEntry("lane1", false);
}
@Test
public void testIsLaneExist() {
// Given
Map<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane1", true);
laneActiveMap.put("lane2", false);
ReflectionTestUtils.setField(tsfActiveLane, "laneActiveMap", laneActiveMap);
// When & Then
assertThat(tsfActiveLane.isLaneExist("lane1")).isTrue();
assertThat(tsfActiveLane.isLaneExist("lane2")).isTrue();
assertThat(tsfActiveLane.isLaneExist("lane3")).isFalse();
}
@Test
public void testIsActiveLane() {
// Given
Map<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane1", true);
laneActiveMap.put("lane2", false);
ReflectionTestUtils.setField(tsfActiveLane, "laneActiveMap", laneActiveMap);
// When & Then
assertThat(tsfActiveLane.isActiveLane("lane1")).isTrue();
assertThat(tsfActiveLane.isActiveLane("lane2")).isFalse();
assertThat(tsfActiveLane.isActiveLane("lane3")).isFalse();
}
@Test
public void testGetCurrentGroupLaneIds() {
// Given
Set<String> currentGroupLaneIds = new TreeSet<>(Arrays.asList("lane1", "lane2"));
ReflectionTestUtils.setField(tsfActiveLane, "currentGroupLaneIds", currentGroupLaneIds);
// When
Set<String> result = tsfActiveLane.getCurrentGroupLaneIds();
// Then
assertThat(result).containsExactly("lane1", "lane2");
}
@Test
public void testFreshWhenInstancesChangeWithEmptyList() {
// Given
List<Instance> emptyInstances = Collections.emptyList();
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshWhenInstancesChange", emptyInstances);
// Then
// Should handle empty list gracefully without exceptions
}
@Test
public void testFreshWhenInstancesChangeWithValidInstances() {
// Given
Instance instance = mock(Instance.class);
when(instance.getMetadata()).thenReturn(createMetadata("test-namespace", "group1", "test-app"));
List<Instance> instances = Collections.singletonList(instance);
// When
ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshWhenInstancesChange", instances);
// Then
Set<String> activeGroupSet = (Set<String>) ReflectionTestUtils.getField(tsfActiveLane, "activeGroupSet");
assertThat(activeGroupSet).contains("group1");
}
private Map<String, String> createMetadata(String namespaceId, String groupId, String applicationId) {
Map<String, String> metadata = new HashMap<>();
metadata.put("TSF_NAMESPACE_ID", namespaceId);
metadata.put("TSF_GROUP_ID", groupId);
metadata.put("TSF_APPLICATION_ID", applicationId);
return metadata;
}
private Map<String, String> createMetadataMap(String value) {
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("test-namespace,test-app", value);
return metadataMap;
}
}

@ -0,0 +1,167 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.kafka.tsf;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link TsfLaneRuleListener}.
*/
public class TsfLaneRuleListenerTest {
private TsfActiveLane tsfActiveLane;
private TsfLaneRuleListener tsfLaneRuleListener;
@BeforeEach
public void setUp() {
tsfActiveLane = mock(TsfActiveLane.class);
tsfLaneRuleListener = new TsfLaneRuleListener(tsfActiveLane);
}
@Test
public void testConstructorInitialization() {
// Verify that constructor properly initializes the TsfActiveLane dependency
assert tsfLaneRuleListener != null;
}
@Test
public void testOnResourceAddWithLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
}
@Test
public void testOnResourceAddWithNonLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.INSTANCE);
// When
tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
}
@Test
public void testOnResourceUpdatedWithLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
}
@Test
public void testOnResourceUpdatedWithNonLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
RegistryCacheValue newValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.SERVICE);
// When
tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
}
@Test
public void testOnResourceDeletedWithLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
}
@Test
public void testOnResourceDeletedWithNonLaneRuleEvent() {
// Given
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue oldValue = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.ROUTING);
// When
tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
}
@Test
public void testEventTypeCoverage() {
// Test all event types to ensure proper filtering
for (ServiceEventKey.EventType eventType : ServiceEventKey.EventType.values()) {
ServiceEventKey svcEventKey = mock(ServiceEventKey.class);
RegistryCacheValue value = mock(RegistryCacheValue.class);
when(svcEventKey.getEventType()).thenReturn(eventType);
// When
tsfLaneRuleListener.onResourceAdd(svcEventKey, value);
// Then
if (eventType == ServiceEventKey.EventType.LANE_RULE) {
verify(tsfActiveLane).freshLaneStatus();
}
else {
verify(tsfActiveLane, never()).freshLaneStatus();
}
// Reset mock for next iteration
Mockito.reset(tsfActiveLane);
}
}
}
Loading…
Cancel
Save