From eb2ed0f7d462765dbfd4fee04d8e1d9fe3d31f9c Mon Sep 17 00:00:00 2001 From: shedfreewu Date: Wed, 26 Nov 2025 19:18:10 +0800 Subject: [PATCH] feat: support kafka lane --- .../ServiceInstanceChangeCallbackManager.java | 1 + spring-cloud-tencent-dependencies/pom.xml | 6 + spring-cloud-tencent-plugin-starters/pom.xml | 1 + .../pom.xml | 66 ++++ .../cloud/plugin/kafka/KafkaLaneAspect.java | 367 ++++++++++++++++++ .../kafka/KafkaLaneAspectConfiguration.java | 48 +++ .../plugin/kafka/KafkaLaneProperties.java | 80 ++++ .../cloud/plugin/kafka/tsf/TsfActiveLane.java | 186 +++++++++ .../plugin/kafka/tsf/TsfLaneRuleListener.java | 58 +++ .../main/resources/META-INF/spring.factories | 2 + .../KafkaLaneAspectConfigurationTest.java | 189 +++++++++ .../plugin/kafka/KafkaLaneAspectTest.java | 349 +++++++++++++++++ .../plugin/kafka/KafkaLanePropertiesTest.java | 108 ++++++ .../plugin/kafka/tsf/TsfActiveLaneTest.java | 252 ++++++++++++ .../kafka/tsf/TsfLaneRuleListenerTest.java | 167 ++++++++ 15 files changed, 1880 insertions(+) create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/pom.xml create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspect.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfiguration.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneProperties.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLane.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListener.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/resources/META-INF/spring.factories create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfigurationTest.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectTest.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLanePropertiesTest.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLaneTest.java create mode 100644 spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListenerTest.java diff --git a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/ServiceInstanceChangeCallbackManager.java b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/ServiceInstanceChangeCallbackManager.java index e479359ef..7a12db4b0 100644 --- a/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/ServiceInstanceChangeCallbackManager.java +++ b/spring-cloud-starter-tencent-polaris-discovery/src/main/java/com/tencent/cloud/polaris/discovery/refresh/ServiceInstanceChangeCallbackManager.java @@ -101,6 +101,7 @@ public class ServiceInstanceChangeCallbackManager implements ApplicationListener if (clz.isAnnotationPresent(ServiceInstanceChangeListener.class)) { ServiceInstanceChangeListener serviceInstanceChangeListener = clz.getAnnotation(ServiceInstanceChangeListener.class); serviceName = serviceInstanceChangeListener.serviceName(); + serviceName = ApplicationContextAwareUtils.getApplicationContext().getEnvironment().resolvePlaceholders(serviceName); } if (StringUtils.isBlank(serviceName)) { diff --git a/spring-cloud-tencent-dependencies/pom.xml b/spring-cloud-tencent-dependencies/pom.xml index 24d557540..153a5d064 100644 --- a/spring-cloud-tencent-dependencies/pom.xml +++ b/spring-cloud-tencent-dependencies/pom.xml @@ -240,6 +240,12 @@ ${revision} + + com.tencent.cloud + spring-cloud-starter-tencent-kafka-lane-plugin + ${revision} + + org.springdoc diff --git a/spring-cloud-tencent-plugin-starters/pom.xml b/spring-cloud-tencent-plugin-starters/pom.xml index 23a26301e..a782ff74c 100644 --- a/spring-cloud-tencent-plugin-starters/pom.xml +++ b/spring-cloud-tencent-plugin-starters/pom.xml @@ -26,6 +26,7 @@ spring-cloud-starter-tencent-multi-discovery-plugin spring-cloud-starter-tencent-traffic-mirroring-plugin spring-cloud-starter-tencent-fault-injection-plugin + spring-cloud-starter-tencent-kafka-lane-plugin diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/pom.xml b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/pom.xml new file mode 100644 index 000000000..a86223859 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/pom.xml @@ -0,0 +1,66 @@ + + + + spring-cloud-tencent-plugin-starters + com.tencent.cloud + ${revision} + ../pom.xml + + 4.0.0 + + spring-cloud-starter-tencent-kafka-lane-plugin + Spring Cloud Starter Tencent Kafka Lane plugin + + + + + com.tencent.polaris + polaris-all + + + + com.tencent.cloud + spring-cloud-tencent-commons + + + + com.tencent.cloud + spring-cloud-tencent-polaris-context + + + + com.tencent.cloud + spring-cloud-starter-tencent-polaris-discovery + + + + org.springframework.boot + spring-boot-starter-aop + + + org.springframework.boot + spring-boot-starter-logging + + + + + + org.springframework.kafka + spring-kafka + provided + true + + + + org.springframework.boot + spring-boot-starter-json + provided + true + + + + + + diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspect.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspect.java new file mode 100644 index 000000000..0f0014e73 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspect.java @@ -0,0 +1,367 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import com.tencent.cloud.common.metadata.MetadataContext; +import com.tencent.cloud.common.metadata.MetadataContextHolder; +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.plugins.router.lane.LaneRouter; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.annotation.Order; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; + +@Aspect +@Order(1) +public class KafkaLaneAspect { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaLaneAspect.class); + /** + * Empty object. + */ + public static final Object EMPTY_OBJECT = new Object(); + + private static final String TSF_LANE_ID = "tsf_laneId"; + + private final PolarisSDKContextManager polarisSDKContextManager; + + private final KafkaLaneProperties kafkaLaneProperties; + + private final TsfActiveLane tsfActiveLane; + + private final String laneHeaderKey; + + @Value("${spring.cloud.tencent.metadata.content.lane:}") + private String lane; + + public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, TsfActiveLane tsfActiveLane) { + this.polarisSDKContextManager = polarisSDKContextManager; + this.kafkaLaneProperties = kafkaLaneProperties; + this.tsfActiveLane = tsfActiveLane; + laneHeaderKey = TsfContextUtils.isOnlyTsfConsulEnabled() ? TSF_LANE_ID : MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL; + } + + @Pointcut("execution(* org.springframework.kafka.core.KafkaTemplate.send(..))") + private void producerPointcut() { + } + + @Around("producerPointcut()") + public Object aroundProducerMessage(ProceedingJoinPoint pjp) throws Throwable { + Object[] args = pjp.getArgs(); + KafkaTemplate target = (KafkaTemplate) pjp.getTarget(); + + if (!this.kafkaLaneProperties.getLaneOn()) { + return pjp.proceed(args); + } + + String laneId = LaneUtils.fetchLaneByCaller( + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext).map(SDKContext::getExtensions).orElse(null), + com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE, + com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE); + + if (StringUtils.isBlank(laneId) || !kafkaLaneProperties.getAutoSetHeader()) { + return pjp.proceed(args); + } + + LOG.debug("kafka producer lane before, args: {}, thread laneId: {}", args, laneId); + + try { + ProducerRecord producerRecord; + if (args.length == 1) { + // ListenableFuture> send(ProducerRecord record); ListenableFuture> send(Message message); + if (args[0] instanceof Message) { + Message message = (Message) args[0]; + producerRecord = target.getMessageConverter().fromMessage(message, target.getDefaultTopic()); + // possibly no Jackson + if (!producerRecord.headers().iterator().hasNext()) { + byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class); + if (correlationId != null) { + producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId); + } + } + } + else { + producerRecord = (ProducerRecord) args[0]; + } + } + else if (args.length == 2) { + // ListenableFuture> send(String topic, V data); + producerRecord = new ProducerRecord<>((String) args[0], args[1]); + } + else if (args.length == 3) { + // ListenableFuture> send(String topic, K key, V data); + producerRecord = new ProducerRecord<>((String) args[0], args[1], args[2]); + } + else if (args.length == 4) { + // ListenableFuture> send(String topic, Integer partition, K key, V data); + producerRecord = new ProducerRecord<>((String) args[0], (Integer) args[1], args[2], args[3]); + } + else if (args.length == 5) { + // ListenableFuture> send(String topic, Integer partition, Long timestamp, K key, V data); + producerRecord = new ProducerRecord<>((String) args[0], (Integer) args[1], (Long) args[2], args[3], args[4]); + } + else { + LOG.error("KafkaTemplate send message with wrong args: {}", args); + return pjp.proceed(args); + } + + Header laneHeader = new RecordHeader(laneHeaderKey, laneId.getBytes(StandardCharsets.UTF_8)); + producerRecord.headers().add(laneHeader); + + LOG.debug("kafka producer lane after, args: {}, laneId: {}", producerRecord, laneId); + + return target.send(producerRecord); + } + catch (Exception e) { + LOG.error("add laneId to kafka message error", e); + } + + return pjp.proceed(args); + } + + @Pointcut("@annotation(org.springframework.kafka.annotation.KafkaListener)") + private void consumerPointcut() { + } + + @Around("consumerPointcut()") + public Object aroundConsumerMessage(ProceedingJoinPoint pjp) throws Throwable { + Object[] args = pjp.getArgs(); + + if (!this.kafkaLaneProperties.getLaneOn()) { + return pjp.proceed(args); + } + // init metadata context + MetadataContextHolder.get(); + + try { + ConsumerRecord consumerRecord = null; + List messageList = null; + int dataPosition = -1; + Acknowledgment acknowledgment = null; + for (int i = 0; i < args.length; i++) { + if ((args[i] instanceof Acknowledgment)) { + acknowledgment = (Acknowledgment) args[i]; + } + else if ((args[i] instanceof ConsumerRecord)) { + consumerRecord = (ConsumerRecord) args[i]; + dataPosition = i; + } + else if (args[i] instanceof List) { + messageList = (List) args[i]; + dataPosition = i; + } + } + + // parameter is message list, for batch consume + if (messageList != null) { + // empty list directly return + if (messageList.isEmpty()) { + return pjp.proceed(args); + } + + // parameter is not consumerRecord, consume directly + if (!(messageList.get(0) instanceof ConsumerRecord)) { + return pjp.proceed(args); + } + + List newMessageList = new ArrayList<>(); + for (Object item : messageList) { + ConsumerRecord record = (ConsumerRecord) item; + String laneId = this.getConsumerRecordLaneId(record); + + boolean ifConsume = this.ifConsume(laneId); + if (ifConsume) { + newMessageList.add(record); + } + else { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("not need consume, laneId: {}, message:{}", laneId, record); + } + } + } + args[dataPosition] = newMessageList; + } + + // parameter is consumerRecord + if (consumerRecord != null) { + String laneId = this.getConsumerRecordLaneId(consumerRecord); + boolean ifConsume = this.ifConsume(laneId); + if (!ifConsume) { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("not need consume, laneId: {}, message:{}", laneId, consumerRecord); + } + return EMPTY_OBJECT; + } + } + } + catch (Exception e) { + LOG.error("extract laneId from kafka message error", e); + } + + Object result = pjp.proceed(args); + LaneUtils.removeCallerLaneId(); + return result; + } + + String getConsumerRecordLaneId(ConsumerRecord consumerRecord) { + + String laneId = null; + + Iterator
iterator = consumerRecord.headers().headers(laneHeaderKey).iterator(); + if (iterator.hasNext()) { + laneId = new String(iterator.next().value(), StandardCharsets.UTF_8); + } + // falls back to the laneId from the metadata context (set by the user's aspect) if the header is empty + if (StringUtils.isBlank(laneId)) { + laneId = LaneUtils.getCallerLaneId(); + // format laneId + if (laneId != null && !laneId.contains("/") && laneId.startsWith("lane-")) { + laneId = "tsf/" + laneId; + } + } + return laneId; + } + + /** + * whether the message is consumed by the current listener. + * @param messageLaneId message lane id. + * @return whether to consume. + */ + boolean ifConsume(String messageLaneId) { + if (TsfContextUtils.isOnlyTsfConsulEnabled() && tsfActiveLane != null) { + return ifConsumeInTsf(messageLaneId); + } + else { + return ifConsumeInPolaris(messageLaneId); + } + } + + private boolean ifConsumeInTsf(String originMessageLaneId) { + String laneId = originMessageLaneId; + if (laneId != null && laneId.contains("/")) { + laneId = laneId.split("/")[1]; + } + + Set groupLaneIdSet = tsfActiveLane.getCurrentGroupLaneIds(); + // message has no lane id + if (StringUtils.isEmpty(laneId)) { + if (groupLaneIdSet.isEmpty()) { + // baseline service, consume directly + return true; + } + else { + // lane listener consumes baseline message + return this.kafkaLaneProperties.getLaneConsumeMain(); + } + } + else { + LaneUtils.setCallerLaneId(originMessageLaneId); + + // message has lane id + if (groupLaneIdSet.isEmpty()) { + // baseline service + // message carries lane id but the current service's lane has no deployment groups, consume baseline + boolean consume = !tsfActiveLane.isLaneExist(laneId); + + // message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume + consume = consume || + (this.kafkaLaneProperties.getMainConsumeLane() && + tsfActiveLane.isLaneExist(laneId) && + !tsfActiveLane.isActiveLane(laneId) + ); + return consume; + } + else { + return groupLaneIdSet.contains(laneId); + } + } + } + + private boolean ifConsumeInPolaris(String messageLaneId) { + // message has no lane id + if (StringUtils.isEmpty(messageLaneId)) { + if (StringUtils.isEmpty(lane)) { + // baseline service, consume directly + return true; + } + else { + // lane listener consumes baseline message + return this.kafkaLaneProperties.getLaneConsumeMain(); + } + } + else { + LaneUtils.setCallerLaneId(messageLaneId); + + // message has lane id + if (StringUtils.isEmpty(lane)) { + // baseline service + return this.kafkaLaneProperties.getMainConsumeLane(); + } + else { + ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE, + com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE); + + Collection groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions()); + // whether the message lane id is the same as the lane id of the listener + for (LaneProto.LaneGroup group : groups) { + for (LaneProto.LaneRule rule : group.getRulesList()) { + if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule)) + && StringUtils.equals(rule.getDefaultLabelValue(), lane)) { + return true; + } + } + } + return false; + } + } + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfiguration.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfiguration.java new file mode 100644 index 000000000..e8ee18948 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfiguration.java @@ -0,0 +1,48 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka; + +import com.tencent.cloud.common.tsf.ConditionalOnOnlyTsfConsulEnabled; +import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration(proxyBeanMethods = false) +@EnableConfigurationProperties(KafkaLaneProperties.class) +public class KafkaLaneAspectConfiguration { + + @Bean + @ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"}) + public KafkaLaneAspect kafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, + KafkaLaneProperties kafkaLaneProperties, @Autowired(required = false) TsfActiveLane tsfActiveLane) { + return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); + } + + @Bean + @ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"}) + @ConditionalOnOnlyTsfConsulEnabled + public TsfActiveLane tsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) { + return new TsfActiveLane(polarisSDKContextManager, discoveryClient); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneProperties.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneProperties.java new file mode 100644 index 000000000..cf392bf63 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/KafkaLaneProperties.java @@ -0,0 +1,80 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "spring.cloud.polaris.lane.kafka") +public class KafkaLaneProperties { + + /** + * enable kafka lane. + */ + private Boolean laneOn = false; + + /** + * lane listener whether to consume main message. + * scene: message without lane label, listener service with lane label, force lane service to consume main message. + * default false. + */ + private Boolean laneConsumeMain = false; + + /** + * main listener whether to consume lane message. + * scene: message with lane label, lane not deployed or not online, main consume lane message. + * default false. + */ + private Boolean mainConsumeLane = false; + + /** + * whether to set lane id to message header. + */ + private Boolean autoSetHeader = true; + + public Boolean getLaneOn() { + return laneOn; + } + + public void setLaneOn(Boolean laneOn) { + this.laneOn = laneOn; + } + + public Boolean getLaneConsumeMain() { + return laneConsumeMain; + } + + public void setLaneConsumeMain(Boolean laneConsumeMain) { + this.laneConsumeMain = laneConsumeMain; + } + + public Boolean getMainConsumeLane() { + return mainConsumeLane; + } + + public void setMainConsumeLane(Boolean mainConsumeLane) { + this.mainConsumeLane = mainConsumeLane; + } + + public Boolean getAutoSetHeader() { + return autoSetHeader; + } + + public void setAutoSetHeader(Boolean autoSetHeader) { + this.autoSetHeader = autoSetHeader; + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLane.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLane.java new file mode 100644 index 000000000..2bdc5e6f0 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLane.java @@ -0,0 +1,186 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka.tsf; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import com.tencent.cloud.common.util.JacksonUtils; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeCallback; +import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeListener; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.metadata.core.constant.TsfMetadataConstants; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; + + +@ServiceInstanceChangeListener(serviceName = "${spring.application.name}") +public class TsfActiveLane implements ServiceInstanceChangeCallback, InitializingBean { + + private static final Logger LOG = LoggerFactory.getLogger(TsfActiveLane.class); + + private final PolarisSDKContextManager polarisSDKContextManager; + + private final PolarisDiscoveryHandler discoveryClient; + /** + * Online deployment groups for this service (same namespace id and application id required). + */ + private volatile Set activeGroupSet = new HashSet<>(); + + private volatile Set currentGroupLaneIds = null; + /** + * key: laneId. + * value: true - online, false - offline. + */ + private volatile Map laneActiveMap = new HashMap<>(); + + @Value("${tsf_namespace_id:}") + private String tsfNamespaceId; + + @Value("${tsf_group_id:}") + private String tsfGroupId; + + @Value("${tsf_application_id:}") + private String tsfApplicationId; + + @Value("${spring.application.name:}") + private String springApplicationName; + + public TsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) { + this.polarisSDKContextManager = polarisSDKContextManager; + this.discoveryClient = discoveryClient; + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) + .map(SDKContext::getExtensions).map(Extensions::getLocalRegistry) + .ifPresent(localRegistry -> localRegistry.registerResourceListener(new TsfLaneRuleListener(this))); + } + + @Override + public void afterPropertiesSet() { + // get instances to trigger callback when instances change + discoveryClient.getHealthyInstances(springApplicationName); + } + + @Override + public void callback(List currentServiceInstances, List addServiceInstances, List deleteServiceInstances) { + if (LOG.isDebugEnabled()) { + LOG.debug("ConsulServiceChangeCallback currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances)); + LOG.debug("current namespaceId: {}, groupId: {}, applicationId: {}", tsfNamespaceId, tsfGroupId, tsfApplicationId); + } + + freshWhenInstancesChange(currentServiceInstances); + + if (LOG.isDebugEnabled()) { + LOG.info("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap)); + } + } + + private void freshWhenInstancesChange(List currentServices) { + if (currentServices == null || currentServices.isEmpty()) { + return; + } + + Set currentActiveGroupSet = new HashSet<>(); + + // get all active groups + for (Instance healthService : currentServices) { + String nsId = healthService.getMetadata().get("TSF_NAMESPACE_ID"); + String groupId = healthService.getMetadata().get("TSF_GROUP_ID"); + String applicationId = healthService.getMetadata().get("TSF_APPLICATION_ID"); + if (tsfNamespaceId.equals(nsId) && tsfApplicationId.equals(applicationId) && StringUtils.isNotEmpty(groupId)) { + currentActiveGroupSet.add(groupId); + } + } + + activeGroupSet = currentActiveGroupSet; + + freshLaneStatus(); + } + + /** + * update lane status. + */ + public void freshLaneStatus() { + Map currentLaneActiveMap = new HashMap<>(); + Set tempCurrentGroupLaneIds = new HashSet<>(); + + ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE, + com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE); + + List groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions()); + + + for (LaneProto.LaneGroup laneGroup : groups) { + for (LaneProto.LaneRule laneRule : laneGroup.getRulesList()) { + // in tsf, if namespace id and application id are in lane rule, it means the service is in lane. + if (!laneGroup.getMetadataMap().containsKey(tsfNamespaceId + "," + tsfApplicationId)) { + continue; + } + // in tsf, lane label key is TsfMetadataConstants.TSF_GROUP_ID + if (!TsfMetadataConstants.TSF_GROUP_ID.equals(laneRule.getLabelKey()) + || StringUtils.isEmpty(laneRule.getDefaultLabelValue())) { + continue; + } + + for (String groupId : laneRule.getDefaultLabelValue().split(",")) { + if (activeGroupSet.contains(groupId)) { + // active group, update lane active status + currentLaneActiveMap.put(laneRule.getId(), true); + } + else { + // inactive group, mark lane as inactive only if no other active group exists + currentLaneActiveMap.putIfAbsent(laneRule.getId(), false); + } + if (StringUtils.equals(groupId, tsfGroupId)) { + tempCurrentGroupLaneIds.add(laneRule.getId()); + } + } + } + } + + laneActiveMap = currentLaneActiveMap; + currentGroupLaneIds = tempCurrentGroupLaneIds; + } + + public boolean isLaneExist(String laneId) { + return laneActiveMap.containsKey(laneId); + } + + public boolean isActiveLane(String laneId) { + return laneActiveMap.getOrDefault(laneId, false); + } + + public Set getCurrentGroupLaneIds() { + return currentGroupLaneIds; + } + +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListener.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListener.java new file mode 100644 index 000000000..7ae86b0d3 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListener.java @@ -0,0 +1,58 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka.tsf; + +import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener; +import com.tencent.polaris.api.pojo.RegistryCacheValue; +import com.tencent.polaris.api.pojo.ServiceEventKey; + +public class TsfLaneRuleListener extends AbstractResourceEventListener { + + private final TsfActiveLane tsfActiveLane; + + public TsfLaneRuleListener(TsfActiveLane tsfActiveLane) { + this.tsfActiveLane = tsfActiveLane; + } + + @Override + public void onResourceAdd(ServiceEventKey svcEventKey, RegistryCacheValue newValue) { + if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) { + return; + } + + tsfActiveLane.freshLaneStatus(); + } + + @Override + public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) { + if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) { + return; + } + + tsfActiveLane.freshLaneStatus(); + } + + @Override + public void onResourceDeleted(ServiceEventKey svcEventKey, RegistryCacheValue oldValue) { + if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) { + return; + } + + tsfActiveLane.freshLaneStatus(); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/resources/META-INF/spring.factories b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..5a8d804fe --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.tencent.cloud.plugin.kafka.KafkaLaneAspectConfiguration diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfigurationTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfigurationTest.java new file mode 100644 index 000000000..5ba3102b5 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectConfigurationTest.java @@ -0,0 +1,189 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka; + +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Test for {@link KafkaLaneAspectConfiguration}. + */ +public class KafkaLaneAspectConfigurationTest { + + private ApplicationContextRunner contextRunner; + + @BeforeEach + public void setUp() { + contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class)) + .withUserConfiguration(MockConfiguration.class); + } + + @AfterEach + public void tearDown() { + // Reset static state for clean tests using reflection + resetTsfContextUtilsStaticFields(); + } + + private void resetTsfContextUtilsStaticFields() { + try { + // Reset isOnlyTsfConsulEnabledFirstConfiguration + Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration"); + isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true); + AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null); + isOnlyTsfConsulEnabledFirstConfiguration.set(true); + + // Reset isTsfConsulEnabledFirstConfiguration + Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration"); + isTsfConsulEnabledFirstConfigurationField.setAccessible(true); + AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null); + isTsfConsulEnabledFirstConfiguration.set(true); + + // Reset tsfConsulEnabled + Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled"); + tsfConsulEnabledField.setAccessible(true); + tsfConsulEnabledField.setBoolean(null, false); + + // Reset onlyTsfConsulEnabled + Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled"); + onlyTsfConsulEnabledField.setAccessible(true); + onlyTsfConsulEnabledField.setBoolean(null, false); + + } + catch (Exception e) { + throw new RuntimeException("Failed to reset TsfContextUtils static fields", e); + } + } + + @Test + public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() { + resetTsfContextUtilsStaticFields(); + // Simulate KafkaTemplate class presence + contextRunner + .withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true") + .run(context -> { + // KafkaLaneAspect should be created when KafkaTemplate is present + assertThat(context).hasSingleBean(KafkaLaneAspect.class); + + KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class); + assertThat(aspect).isNotNull(); + }); + } + + @Test + public void testTsfActiveLaneBeanNotCreatedWhenPolarisAddressPresent() { + resetTsfContextUtilsStaticFields(); + // Simulate Polaris address present (not only TSF Consul) + contextRunner + .withPropertyValues( + "tsf_consul_enable=true", + "tsf_consul_ip=127.0.0.1", + "spring.cloud.polaris.address=127.0.0.1:8091" + ) + .run(context -> { + // TsfActiveLane should NOT be created when Polaris address is present + assertThat(context).doesNotHaveBean(TsfActiveLane.class); + }); + } + + @Test + public void testKafkaLanePropertiesEnabled() { + resetTsfContextUtilsStaticFields(); + contextRunner + .withPropertyValues( + "spring.cloud.polaris.lane.kafka.lane-on=true", + "spring.cloud.polaris.lane.kafka.lane-consume-main=true", + "spring.cloud.polaris.lane.kafka.main-consume-lane=true" + ) + .run(context -> { + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + assertThat(properties.getLaneOn()).isTrue(); + assertThat(properties.getLaneConsumeMain()).isTrue(); + assertThat(properties.getMainConsumeLane()).isTrue(); + }); + } + + @Test + public void testBeanDependenciesInjection() { + resetTsfContextUtilsStaticFields(); + contextRunner + .withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true") + .run(context -> { + // Verify that all required dependencies are properly injected + assertThat(context).hasSingleBean(KafkaLaneAspect.class); + + KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class); + + // The aspect should have all required dependencies + assertThat(aspect).isNotNull(); + + // Verify that KafkaLaneProperties is properly configured + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + assertThat(properties).isNotNull(); + assertThat(properties.getLaneOn()).isTrue(); + }); + } + + @Test + public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() { + resetTsfContextUtilsStaticFields(); + // Simulate Only TSF Consul enabled condition + contextRunner + .withPropertyValues( + "tsf_consul_enable=true", + "tsf_consul_ip=127.0.0.1", + "spring.cloud.polaris.address=" // Empty to simulate only TSF Consul + ) + .run(context -> { + // TsfActiveLane should be created when only TSF Consul is enabled + assertThat(context).hasSingleBean(TsfActiveLane.class); + + TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class); + assertThat(tsfActiveLane).isNotNull(); + }); + } + + @Configuration + static class MockConfiguration { + @Bean + public PolarisSDKContextManager polarisSDKContextManager() { + return mock(PolarisSDKContextManager.class); + } + + @Bean + public PolarisDiscoveryHandler polarisDiscoveryHandler() { + return mock(PolarisDiscoveryHandler.class); + } + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectTest.java new file mode 100644 index 000000000..1a5a46af2 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLaneAspectTest.java @@ -0,0 +1,349 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.kafka.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for {@link KafkaLaneAspect}. + */ +public class KafkaLaneAspectTest { + + private KafkaLaneAspect kafkaLaneAspect; + private PolarisSDKContextManager polarisSDKContextManager; + private KafkaLaneProperties kafkaLaneProperties; + private TsfActiveLane tsfActiveLane; + private MockedStatic laneUtilsMockedStatic; + private MockedStatic tsfContextUtilsMockedStatic; + + @BeforeEach + public void setUp() { + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + kafkaLaneProperties = new KafkaLaneProperties(); + tsfActiveLane = mock(TsfActiveLane.class); + + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + tsfContextUtilsMockedStatic.close(); + resetTsfContextUtilsStaticFields(); + } + + private void resetTsfContextUtilsStaticFields() { + try { + // Reset isOnlyTsfConsulEnabledFirstConfiguration + Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration"); + isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true); + AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null); + isOnlyTsfConsulEnabledFirstConfiguration.set(true); + + // Reset isTsfConsulEnabledFirstConfiguration + Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration"); + isTsfConsulEnabledFirstConfigurationField.setAccessible(true); + AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null); + isTsfConsulEnabledFirstConfiguration.set(true); + + // Reset tsfConsulEnabled + Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled"); + tsfConsulEnabledField.setAccessible(true); + tsfConsulEnabledField.setBoolean(null, false); + + // Reset onlyTsfConsulEnabled + Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled"); + onlyTsfConsulEnabledField.setAccessible(true); + onlyTsfConsulEnabledField.setBoolean(null, false); + + } + catch (Exception e) { + throw new RuntimeException("Failed to reset TsfContextUtils static fields", e); + } + } + + @Test + public void testProducerAspectWhenLaneDisabled() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(false); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ProducerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testProducerAspectWhenNoLaneId() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ProducerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testProducerAspectWithProducerRecord() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value"); + + when(pjp.getTarget()).thenReturn(kafkaTemplate); + when(pjp.getArgs()).thenReturn(new Object[] {producerRecord}); + when(kafkaTemplate.send(producerRecord)).thenReturn(null); + + // When + kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + Iterator
headers = producerRecord.headers().headers("tsf_laneId").iterator(); + assertThat(headers.hasNext()).isTrue(); + Header laneHeader = headers.next(); + assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId); + } + + @Test + public void testProducerAspectWithMessage() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class); + Message message = MessageBuilder.withPayload("test-payload").build(); + ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value"); + + when(pjp.getTarget()).thenReturn(kafkaTemplate); + when(pjp.getArgs()).thenReturn(new Object[] {message}); + when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic"); + when(kafkaTemplate.send(producerRecord)).thenReturn(null); + when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter); + + // When + kafkaLaneAspect.aroundProducerMessage(pjp); + } + + @Test + public void testConsumerAspectWhenLaneDisabled() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(false); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ConsumerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testConsumerAspectWithLaneHeader() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add("tsf_laneId", laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + } + + @Test + public void testGetConsumerRecordLaneIdFromHeader() { + // Given + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + String expectedLaneId = "test-lane-id"; + consumerRecord.headers().add("tsf_laneId", expectedLaneId.getBytes(StandardCharsets.UTF_8)); + + // When + String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); + + // Then + assertThat(laneId).isEqualTo(expectedLaneId); + } + + @Test + public void testGetConsumerRecordLaneIdFromCallerLane() { + // Given + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + String expectedLaneId = "lane-test"; + laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId); + + // When + String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); + + // Then + assertThat(laneId).isEqualTo("tsf/" + expectedLaneId); + } + + @Test + public void testIfConsumeInPolarisWithNoLaneId() { + // Given + kafkaLaneProperties.setLaneConsumeMain(true); + + // Use reflection to set the lane field + try { + Field laneField = KafkaLaneAspect.class.getDeclaredField("lane"); + laneField.setAccessible(true); + laneField.set(kafkaLaneAspect, "test-lane"); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // When + boolean shouldConsume = kafkaLaneAspect.ifConsume(""); + + // Then + assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true + } + + @Test + public void testIfConsumeInTsfWithNoLaneId() { + // Given + kafkaLaneProperties.setLaneConsumeMain(true); + tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true); + when(tsfActiveLane.getCurrentGroupLaneIds()).thenReturn(Collections.emptySet()); + + // When + boolean shouldConsume = kafkaLaneAspect.ifConsume(""); + + // Then + assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true + } + + @Test + public void testConsumerAspectWithBatchMessages() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + + List messageList = new ArrayList<>(); + ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1"); + record1.headers().add("tsf_laneId", "lane1".getBytes(StandardCharsets.UTF_8)); + ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2"); + record2.headers().add("tsf_laneId", "lane2".getBytes(StandardCharsets.UTF_8)); + + messageList.add(record1); + messageList.add(record2); + + Acknowledgment acknowledgment = mock(Acknowledgment.class); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {messageList, acknowledgment}; + + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(any())).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(any()); + } + + @Test + public void testConsumerAspectWithEmptyBatch() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + + List emptyList = new ArrayList<>(); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {emptyList}; + + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLanePropertiesTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLanePropertiesTest.java new file mode 100644 index 000000000..2a5c927cd --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/KafkaLanePropertiesTest.java @@ -0,0 +1,108 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test for {@link KafkaLaneProperties}. + */ +public class KafkaLanePropertiesTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withUserConfiguration(TestConfiguration.class); + + @Test + public void testDefaultValues() { + this.contextRunner.run(context -> { + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + + // test default values + assertThat(properties.getLaneOn()).isFalse(); + assertThat(properties.getLaneConsumeMain()).isFalse(); + assertThat(properties.getMainConsumeLane()).isFalse(); + }); + } + + @Test + public void testConfigurationPropertiesPrefix() { + this.contextRunner + .withPropertyValues( + "spring.cloud.polaris.lane.kafka.lane-on=true", + "spring.cloud.polaris.lane.kafka.lane-consume-main=true", + "spring.cloud.polaris.lane.kafka.main-consume-lane=true" + ) + .run(context -> { + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + + // test properties + assertThat(properties.getLaneOn()).isTrue(); + assertThat(properties.getLaneConsumeMain()).isTrue(); + assertThat(properties.getMainConsumeLane()).isTrue(); + }); + } + + @Test + public void testGetterAndSetter() { + KafkaLaneProperties properties = new KafkaLaneProperties(); + + // test laneOn property + properties.setLaneOn(true); + assertThat(properties.getLaneOn()).isTrue(); + properties.setLaneOn(false); + assertThat(properties.getLaneOn()).isFalse(); + + // test laneConsumeMain property + properties.setLaneConsumeMain(true); + assertThat(properties.getLaneConsumeMain()).isTrue(); + properties.setLaneConsumeMain(false); + assertThat(properties.getLaneConsumeMain()).isFalse(); + + // test mainConsumeLane property + properties.setMainConsumeLane(true); + assertThat(properties.getMainConsumeLane()).isTrue(); + properties.setMainConsumeLane(false); + assertThat(properties.getMainConsumeLane()).isFalse(); + } + + @Test + public void testPartialConfiguration() { + this.contextRunner + .withPropertyValues( + "spring.cloud.polaris.lane.kafka.lane-on=true" + ) + .run(context -> { + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + + // test laneOn property + assertThat(properties.getLaneOn()).isTrue(); + // test other properties keep default values + assertThat(properties.getLaneConsumeMain()).isFalse(); + assertThat(properties.getMainConsumeLane()).isFalse(); + }); + } + + @EnableConfigurationProperties(KafkaLaneProperties.class) + static class TestConfiguration { + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLaneTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLaneTest.java new file mode 100644 index 000000000..612ffab77 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfActiveLaneTest.java @@ -0,0 +1,252 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka.tsf; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.metadata.core.constant.TsfMetadataConstants; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.test.util.ReflectionTestUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for {@link TsfActiveLane}. + */ +public class TsfActiveLaneTest { + + private TsfActiveLane tsfActiveLane; + private PolarisSDKContextManager polarisSDKContextManager; + private PolarisDiscoveryHandler discoveryClient; + private SDKContext sdkContext; + private Extensions extensions; + private MockedStatic laneUtilsMockedStatic; + + @BeforeEach + public void setUp() { + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + discoveryClient = mock(PolarisDiscoveryHandler.class); + sdkContext = mock(SDKContext.class); + extensions = mock(Extensions.class); + + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + + when(polarisSDKContextManager.getSDKContext()).thenReturn(sdkContext); + when(sdkContext.getExtensions()).thenReturn(extensions); + + tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient); + + // Set up field values using reflection + ReflectionTestUtils.setField(tsfActiveLane, "tsfNamespaceId", "test-namespace"); + ReflectionTestUtils.setField(tsfActiveLane, "tsfGroupId", "test-group"); + ReflectionTestUtils.setField(tsfActiveLane, "tsfApplicationId", "test-app"); + ReflectionTestUtils.setField(tsfActiveLane, "springApplicationName", "test-service"); + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + } + + @Test + public void testConstructorInitialization() { + // Verify that constructor properly initializes dependencies + assertThat(tsfActiveLane).isNotNull(); + verify(polarisSDKContextManager, times(1)).getSDKContext(); + verify(sdkContext, times(1)).getExtensions(); + } + + @Test + public void testCallbackWithEmptyInstances() { + // Given + List currentInstances = Collections.emptyList(); + List addInstances = Collections.emptyList(); + List deleteInstances = Collections.emptyList(); + + // When + tsfActiveLane.callback(currentInstances, addInstances, deleteInstances); + + // Then + // Should not throw any exceptions and handle empty instances gracefully + } + + @Test + public void testFreshLaneStatusWithActiveGroups() { + // Given + Set activeGroups = new HashSet<>(Arrays.asList("group1", "group2")); + ReflectionTestUtils.setField(tsfActiveLane, "activeGroupSet", activeGroups); + + // Mock lane groups and rules + LaneProto.LaneGroup laneGroup = mock(LaneProto.LaneGroup.class); + LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class); + + when(laneGroup.getMetadataMap()).thenReturn(createMetadataMap("test-namespace,test-app")); + when(laneGroup.getRulesList()).thenReturn(Collections.singletonList(laneRule)); + when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID); + when(laneRule.getDefaultLabelValue()).thenReturn("test-group,group2,group3"); + when(laneRule.getId()).thenReturn("lane1"); + + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(ServiceKey.class), any(Extensions.class))) + .thenReturn(Collections.singletonList(laneGroup)); + + // When + ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshLaneStatus"); + + // Then + Map laneActiveMap = (Map) ReflectionTestUtils.getField(tsfActiveLane, "laneActiveMap"); + Set currentGroupLaneIds = (Set) ReflectionTestUtils.getField(tsfActiveLane, "currentGroupLaneIds"); + + assertThat(laneActiveMap).containsEntry("lane1", true); + assertThat(currentGroupLaneIds).contains("lane1"); + } + + @Test + public void testFreshLaneStatusWithInactiveGroups() { + // Given + Set activeGroups = new HashSet<>(Collections.singletonList("group1")); + ReflectionTestUtils.setField(tsfActiveLane, "activeGroupSet", activeGroups); + + // Mock lane groups and rules + LaneProto.LaneGroup laneGroup = mock(LaneProto.LaneGroup.class); + LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class); + + when(laneGroup.getMetadataMap()).thenReturn(createMetadataMap("test-namespace,test-app")); + when(laneGroup.getRulesList()).thenReturn(Collections.singletonList(laneRule)); + when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID); + when(laneRule.getDefaultLabelValue()).thenReturn("group2,group3"); + when(laneRule.getId()).thenReturn("lane1"); + + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(ServiceKey.class), any(Extensions.class))) + .thenReturn(Collections.singletonList(laneGroup)); + + // When + ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshLaneStatus"); + + // Then + Map laneActiveMap = (Map) ReflectionTestUtils.getField(tsfActiveLane, "laneActiveMap"); + assertThat(laneActiveMap).containsEntry("lane1", false); + } + + @Test + public void testIsLaneExist() { + // Given + Map laneActiveMap = new HashMap<>(); + laneActiveMap.put("lane1", true); + laneActiveMap.put("lane2", false); + ReflectionTestUtils.setField(tsfActiveLane, "laneActiveMap", laneActiveMap); + + // When & Then + assertThat(tsfActiveLane.isLaneExist("lane1")).isTrue(); + assertThat(tsfActiveLane.isLaneExist("lane2")).isTrue(); + assertThat(tsfActiveLane.isLaneExist("lane3")).isFalse(); + } + + @Test + public void testIsActiveLane() { + // Given + Map laneActiveMap = new HashMap<>(); + laneActiveMap.put("lane1", true); + laneActiveMap.put("lane2", false); + ReflectionTestUtils.setField(tsfActiveLane, "laneActiveMap", laneActiveMap); + + // When & Then + assertThat(tsfActiveLane.isActiveLane("lane1")).isTrue(); + assertThat(tsfActiveLane.isActiveLane("lane2")).isFalse(); + assertThat(tsfActiveLane.isActiveLane("lane3")).isFalse(); + } + + @Test + public void testGetCurrentGroupLaneIds() { + // Given + Set currentGroupLaneIds = new TreeSet<>(Arrays.asList("lane1", "lane2")); + ReflectionTestUtils.setField(tsfActiveLane, "currentGroupLaneIds", currentGroupLaneIds); + + // When + Set result = tsfActiveLane.getCurrentGroupLaneIds(); + + // Then + assertThat(result).containsExactly("lane1", "lane2"); + } + + @Test + public void testFreshWhenInstancesChangeWithEmptyList() { + // Given + List emptyInstances = Collections.emptyList(); + + // When + ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshWhenInstancesChange", emptyInstances); + + // Then + // Should handle empty list gracefully without exceptions + } + + @Test + public void testFreshWhenInstancesChangeWithValidInstances() { + // Given + Instance instance = mock(Instance.class); + when(instance.getMetadata()).thenReturn(createMetadata("test-namespace", "group1", "test-app")); + + List instances = Collections.singletonList(instance); + + // When + ReflectionTestUtils.invokeMethod(tsfActiveLane, "freshWhenInstancesChange", instances); + + // Then + Set activeGroupSet = (Set) ReflectionTestUtils.getField(tsfActiveLane, "activeGroupSet"); + assertThat(activeGroupSet).contains("group1"); + } + + private Map createMetadata(String namespaceId, String groupId, String applicationId) { + Map metadata = new HashMap<>(); + metadata.put("TSF_NAMESPACE_ID", namespaceId); + metadata.put("TSF_GROUP_ID", groupId); + metadata.put("TSF_APPLICATION_ID", applicationId); + return metadata; + } + + private Map createMetadataMap(String value) { + Map metadataMap = new HashMap<>(); + metadataMap.put("test-namespace,test-app", value); + return metadataMap; + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListenerTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListenerTest.java new file mode 100644 index 000000000..d3726b3eb --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-kafka-lane-plugin/src/test/java/com/tencent/cloud/plugin/kafka/tsf/TsfLaneRuleListenerTest.java @@ -0,0 +1,167 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.kafka.tsf; + +import com.tencent.polaris.api.pojo.RegistryCacheValue; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for {@link TsfLaneRuleListener}. + */ +public class TsfLaneRuleListenerTest { + + private TsfActiveLane tsfActiveLane; + private TsfLaneRuleListener tsfLaneRuleListener; + + @BeforeEach + public void setUp() { + tsfActiveLane = mock(TsfActiveLane.class); + tsfLaneRuleListener = new TsfLaneRuleListener(tsfActiveLane); + } + + @Test + public void testConstructorInitialization() { + // Verify that constructor properly initializes the TsfActiveLane dependency + assert tsfLaneRuleListener != null; + } + + @Test + public void testOnResourceAddWithLaneRuleEvent() { + // Given + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue newValue = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE); + + // When + tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue); + + // Then + verify(tsfActiveLane).freshLaneStatus(); + } + + @Test + public void testOnResourceAddWithNonLaneRuleEvent() { + // Given + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue newValue = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.INSTANCE); + + // When + tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue); + + // Then + verify(tsfActiveLane, never()).freshLaneStatus(); + } + + @Test + public void testOnResourceUpdatedWithLaneRuleEvent() { + // Given + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue oldValue = mock(RegistryCacheValue.class); + RegistryCacheValue newValue = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE); + + // When + tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue); + + // Then + verify(tsfActiveLane).freshLaneStatus(); + } + + @Test + public void testOnResourceUpdatedWithNonLaneRuleEvent() { + // Given + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue oldValue = mock(RegistryCacheValue.class); + RegistryCacheValue newValue = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.SERVICE); + + // When + tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue); + + // Then + verify(tsfActiveLane, never()).freshLaneStatus(); + } + + @Test + public void testOnResourceDeletedWithLaneRuleEvent() { + // Given + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue oldValue = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE); + + // When + tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue); + + // Then + verify(tsfActiveLane).freshLaneStatus(); + } + + @Test + public void testOnResourceDeletedWithNonLaneRuleEvent() { + // Given + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue oldValue = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.ROUTING); + + // When + tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue); + + // Then + verify(tsfActiveLane, never()).freshLaneStatus(); + } + + @Test + public void testEventTypeCoverage() { + // Test all event types to ensure proper filtering + for (ServiceEventKey.EventType eventType : ServiceEventKey.EventType.values()) { + ServiceEventKey svcEventKey = mock(ServiceEventKey.class); + RegistryCacheValue value = mock(RegistryCacheValue.class); + + when(svcEventKey.getEventType()).thenReturn(eventType); + + // When + tsfLaneRuleListener.onResourceAdd(svcEventKey, value); + + // Then + if (eventType == ServiceEventKey.EventType.LANE_RULE) { + verify(tsfActiveLane).freshLaneStatus(); + } + else { + verify(tsfActiveLane, never()).freshLaneStatus(); + } + + // Reset mock for next iteration + Mockito.reset(tsfActiveLane); + } + } +}