diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a2e21eb7..ca50588fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,3 +21,4 @@ - [fix: fix lane router property name.](https://github.com/Tencent/spring-cloud-tencent/pull/1789) - [feat: support kafka lane.](https://github.com/Tencent/spring-cloud-tencent/pull/1765) - [fix: ApplicationContextAwareUtils may not be ready in postProcessAfterInitialization.](https://github.com/Tencent/spring-cloud-tencent/pull/1779) +- [fix: kafka lane support dynamic lane tag.](https://github.com/Tencent/spring-cloud-tencent/pull/1784) diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/AbstractActiveLane.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/AbstractActiveLane.java new file mode 100644 index 000000000..e978dfa61 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/AbstractActiveLane.java @@ -0,0 +1,53 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane; + +import java.util.List; + +import com.tencent.cloud.common.metadata.MetadataContext; +import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.RegistryCacheValue; +import com.tencent.polaris.api.pojo.ServiceEventKey; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.pojo.ServiceInstancesByProto; + +public abstract class AbstractActiveLane extends AbstractResourceEventListener { + + public abstract boolean ifConsume(String laneId, MqLaneProperties mqLaneProperties); + + public abstract String getLaneHeaderKey(); + + public String formatLaneId(String laneId) { + return laneId; + } + + public abstract void callback(List currentServiceInstances); + + @Override + public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) { + + if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE + && newValue instanceof ServiceInstancesByProto + && StringUtils.equals(svcEventKey.getService(), MetadataContext.LOCAL_SERVICE)) { + + ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue; + callback(newIns.getInstances()); + } + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListener.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListener.java similarity index 80% rename from spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListener.java rename to spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListener.java index 48ca95eaa..ad46e64c0 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListener.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListener.java @@ -15,18 +15,18 @@ * specific language governing permissions and limitations under the License. */ -package com.tencent.cloud.plugin.mq.lane.tsf; +package com.tencent.cloud.plugin.mq.lane; import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener; import com.tencent.polaris.api.pojo.RegistryCacheValue; import com.tencent.polaris.api.pojo.ServiceEventKey; -public class TsfLaneRuleListener extends AbstractResourceEventListener { +public class LaneRuleListener extends AbstractResourceEventListener { - private final TsfActiveLane tsfActiveLane; + private final Runnable refreshAction; - public TsfLaneRuleListener(TsfActiveLane tsfActiveLane) { - this.tsfActiveLane = tsfActiveLane; + public LaneRuleListener(Runnable refreshAction) { + this.refreshAction = refreshAction; } @Override @@ -34,8 +34,7 @@ public class TsfLaneRuleListener extends AbstractResourceEventListener { if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) { return; } - - tsfActiveLane.freshLaneStatus(); + refreshAction.run(); } @Override @@ -43,8 +42,7 @@ public class TsfLaneRuleListener extends AbstractResourceEventListener { if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) { return; } - - tsfActiveLane.freshLaneStatus(); + refreshAction.run(); } @Override @@ -52,7 +50,6 @@ public class TsfLaneRuleListener extends AbstractResourceEventListener { if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) { return; } - - tsfActiveLane.freshLaneStatus(); + refreshAction.run(); } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/MqLaneProperties.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/MqLaneProperties.java new file mode 100644 index 000000000..c88dc5017 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/MqLaneProperties.java @@ -0,0 +1,29 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane; + +public interface MqLaneProperties { + + Boolean getLaneOn(); + + Boolean getLaneConsumeMain(); + + Boolean getMainConsumeLane(); + + Boolean getAutoSetHeader(); +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java new file mode 100644 index 000000000..c736f34e1 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java @@ -0,0 +1,210 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import com.tencent.cloud.common.metadata.MetadataContext; +import com.tencent.cloud.common.util.JacksonUtils; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.api.config.Configuration; +import com.tencent.polaris.api.config.consumer.ConsumerConfig; +import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; +import com.tencent.polaris.api.plugin.compose.Extensions; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.api.pojo.ServiceKey; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.plugins.router.lane.BaseLaneMode; +import com.tencent.polaris.plugins.router.lane.LaneRouter; +import com.tencent.polaris.plugins.router.lane.LaneRouterConfig; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.client.serviceregistry.Registration; + + +public class PolarisActiveLane extends AbstractActiveLane implements InitializingBean { + + private static final Logger LOG = LoggerFactory.getLogger(PolarisActiveLane.class); + + private final PolarisSDKContextManager polarisSDKContextManager; + + private final PolarisDiscoveryHandler discoveryClient; + + @Value("${spring.application.name:}") + private String springApplicationName; + /** + * current instance lane tag, related to baseLaneMode. + */ + private volatile String instanceLaneTag = ""; + + private volatile boolean serviceInLane = false; + + private volatile List groups; + + private Registration registration; + + private BaseLaneMode baseLaneMode = BaseLaneMode.ONLY_UNTAGGED_INSTANCE; + + public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager, + PolarisDiscoveryHandler discoveryClient, Registration registration) { + this.polarisSDKContextManager = polarisSDKContextManager; + this.discoveryClient = discoveryClient; + this.registration = registration; + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) + .map(SDKContext::getExtensions).map(Extensions::getLocalRegistry) + .ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus))); + + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) + .map(SDKContext::getExtensions).map(Extensions::getLocalRegistry) + .ifPresent(localRegistry -> localRegistry.registerResourceListener(this)); + + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) + .map(SDKContext::getConfig).map(Configuration::getConsumer).map(ConsumerConfig::getServiceRouter).ifPresent(serviceRouterConfig -> { + LaneRouterConfig laneRouterConfig = serviceRouterConfig.getPluginConfig(ServiceRouterConfig.DEFAULT_ROUTER_LANE, LaneRouterConfig.class); + if (laneRouterConfig != null) { + baseLaneMode = laneRouterConfig.getBaseLaneMode(); + } + }); + } + + @Override + public void afterPropertiesSet() { + // get instances to trigger callback when instances change + discoveryClient.getHealthyInstances(springApplicationName); + } + + @Override + public void callback(List currentServiceInstances) { + if (LOG.isDebugEnabled()) { + LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances)); + } + + freshWhenInstancesChange(currentServiceInstances); + + if (LOG.isDebugEnabled()) { + LOG.debug("current instanceLaneTag:{}, serviceInLane: {}, baseLaneMode:{}", instanceLaneTag, serviceInLane, baseLaneMode); + } + } + + private void freshWhenInstancesChange(List currentServices) { + freshLaneStatus(); + + String tempInstanceLaneTag = ""; + // get all active groups + if (CollectionUtils.isNotEmpty(currentServices)) { + for (Instance healthService : currentServices) { + if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) { + tempInstanceLaneTag = healthService.getMetadata().get("lane"); + break; + } + } + } + + if (BaseLaneMode.ONLY_UNTAGGED_INSTANCE.equals(baseLaneMode)) { + instanceLaneTag = tempInstanceLaneTag; + } + else { + // if baseLaneMode is EXCLUDE_ENABLED_LANE_INSTANCE, check if the instance lane tag is in the lane + boolean laneTagExist = false; + for (LaneProto.LaneGroup group : getGroups()) { + for (LaneProto.LaneRule rule : group.getRulesList()) { + if (StringUtils.equals(rule.getDefaultLabelValue(), tempInstanceLaneTag)) { + laneTagExist = true; + } + } + } + instanceLaneTag = laneTagExist ? tempInstanceLaneTag : ""; + } + } + + /** + * update lane status. + */ + public void freshLaneStatus() { + + ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE, + com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE); + + groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions()); + + serviceInLane = CollectionUtils.isNotEmpty(groups); + } + + public boolean currentInstanceInLane() { + return StringUtils.isNotEmpty(instanceLaneTag) && serviceInLane; + } + + public String getInstanceLaneTag() { + return instanceLaneTag; + } + + public List getGroups() { + return groups == null ? Collections.emptyList() : groups; + } + + @Override + public boolean ifConsume(String messageLaneId, MqLaneProperties mqLaneProperties) { + // message has no lane id + if (StringUtils.isEmpty(messageLaneId)) { + if (!currentInstanceInLane()) { + // baseline service, consume directly + return true; + } + else { + // lane listener consumes baseline message + return mqLaneProperties.getLaneConsumeMain(); + } + } + else { + LaneUtils.setCallerLaneId(messageLaneId); + + // message has lane id + if (!currentInstanceInLane()) { + // baseline service + return mqLaneProperties.getMainConsumeLane(); + } + else { + // whether the message lane id is the same as the lane id of the listener + for (LaneProto.LaneGroup group : getGroups()) { + for (LaneProto.LaneRule rule : group.getRulesList()) { + if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule)) + && StringUtils.equals(rule.getDefaultLabelValue(), getInstanceLaneTag())) { + return true; + } + } + } + return false; + } + } + } + + @Override + public String getLaneHeaderKey() { + return MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL; + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspect.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspect.java index e24e8f371..1bf6e5fec 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspect.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspect.java @@ -19,23 +19,16 @@ package com.tencent.cloud.plugin.mq.lane.kafka; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; -import com.tencent.cloud.common.metadata.MetadataContext; import com.tencent.cloud.common.metadata.MetadataContextHolder; -import com.tencent.cloud.common.tsf.TsfContextUtils; -import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; +import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; -import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.api.SDKContext; -import com.tencent.polaris.plugins.router.lane.LaneRouter; import com.tencent.polaris.plugins.router.lane.LaneUtils; -import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; @@ -47,7 +40,6 @@ import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; import org.springframework.core.annotation.Order; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; @@ -64,24 +56,19 @@ public class KafkaLaneAspect { */ public static final Object EMPTY_OBJECT = new Object(); - private static final String TSF_LANE_ID = "tsf_laneId"; - private final PolarisSDKContextManager polarisSDKContextManager; private final KafkaLaneProperties kafkaLaneProperties; - private final TsfActiveLane tsfActiveLane; + private final AbstractActiveLane activeLane; private final String laneHeaderKey; - @Value("${spring.cloud.tencent.metadata.content.lane:}") - private String lane; - - public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, TsfActiveLane tsfActiveLane) { + public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, AbstractActiveLane activeLane) { this.polarisSDKContextManager = polarisSDKContextManager; this.kafkaLaneProperties = kafkaLaneProperties; - this.tsfActiveLane = tsfActiveLane; - laneHeaderKey = TsfContextUtils.isOnlyTsfConsulEnabled() ? TSF_LANE_ID : MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL; + this.activeLane = activeLane; + laneHeaderKey = activeLane.getLaneHeaderKey(); } @Pointcut("execution(* org.springframework.kafka.core.KafkaTemplate.send(..))") @@ -263,11 +250,9 @@ public class KafkaLaneAspect { // falls back to the laneId from the metadata context (set by the user's aspect) if the header is empty if (StringUtils.isBlank(laneId)) { laneId = LaneUtils.getCallerLaneId(); - // format laneId - if (laneId != null && !laneId.contains("/") && laneId.startsWith("lane-")) { - laneId = "tsf/" + laneId; - } } + // format laneId + laneId = activeLane.formatLaneId(laneId); return laneId; } @@ -277,91 +262,6 @@ public class KafkaLaneAspect { * @return whether to consume. */ boolean ifConsume(String messageLaneId) { - if (TsfContextUtils.isOnlyTsfConsulEnabled() && tsfActiveLane != null) { - return ifConsumeInTsf(messageLaneId); - } - else { - return ifConsumeInPolaris(messageLaneId); - } - } - - private boolean ifConsumeInTsf(String originMessageLaneId) { - String laneId = originMessageLaneId; - if (laneId != null && laneId.contains("/")) { - laneId = laneId.split("/")[1]; - } - - Set groupLaneIdSet = tsfActiveLane.getCurrentGroupLaneIds(); - // message has no lane id - if (StringUtils.isEmpty(laneId)) { - if (groupLaneIdSet.isEmpty()) { - // baseline service, consume directly - return true; - } - else { - // lane listener consumes baseline message - return this.kafkaLaneProperties.getLaneConsumeMain(); - } - } - else { - LaneUtils.setCallerLaneId(originMessageLaneId); - - // message has lane id - if (groupLaneIdSet.isEmpty()) { - // baseline service - // message carries lane id but the current service's lane has no deployment groups, consume baseline - boolean consume = !tsfActiveLane.isLaneExist(laneId); - - // message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume - consume = consume || - (this.kafkaLaneProperties.getMainConsumeLane() && - tsfActiveLane.isLaneExist(laneId) && - !tsfActiveLane.isActiveLane(laneId) - ); - return consume; - } - else { - return groupLaneIdSet.contains(laneId); - } - } - } - - private boolean ifConsumeInPolaris(String messageLaneId) { - // message has no lane id - if (StringUtils.isEmpty(messageLaneId)) { - if (StringUtils.isEmpty(lane)) { - // baseline service, consume directly - return true; - } - else { - // lane listener consumes baseline message - return this.kafkaLaneProperties.getLaneConsumeMain(); - } - } - else { - LaneUtils.setCallerLaneId(messageLaneId); - - // message has lane id - if (StringUtils.isEmpty(lane)) { - // baseline service - return this.kafkaLaneProperties.getMainConsumeLane(); - } - else { - ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE, - com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE); - - Collection groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions()); - // whether the message lane id is the same as the lane id of the listener - for (LaneProto.LaneGroup group : groups) { - for (LaneProto.LaneRule rule : group.getRulesList()) { - if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule)) - && StringUtils.equals(rule.getDefaultLabelValue(), lane)) { - return true; - } - } - } - return false; - } - } + return activeLane.ifConsume(messageLaneId, kafkaLaneProperties); } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfiguration.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfiguration.java index b6ffd6da2..b9b727b2b 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfiguration.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfiguration.java @@ -17,15 +17,17 @@ package com.tencent.cloud.plugin.mq.lane.kafka; -import com.tencent.cloud.common.tsf.ConditionalOnOnlyTsfConsulEnabled; +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane; +import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane; import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.client.serviceregistry.Registration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -36,15 +38,21 @@ public class KafkaLaneAspectConfiguration { @Bean @ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"}) public KafkaLaneAspect kafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, - KafkaLaneProperties kafkaLaneProperties, @Autowired(required = false) TsfActiveLane tsfActiveLane) { - return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); + KafkaLaneProperties kafkaLaneProperties, + AbstractActiveLane activeLane) { + return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, activeLane); } @Bean @ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"}) @ConditionalOnMissingBean - @ConditionalOnOnlyTsfConsulEnabled - public TsfActiveLane tsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) { - return new TsfActiveLane(polarisSDKContextManager, discoveryClient); + public AbstractActiveLane activeLane(PolarisSDKContextManager polarisSDKContextManager, + PolarisDiscoveryHandler discoveryClient, Registration registration) { + if (TsfContextUtils.isOnlyTsfConsulEnabled()) { + return new TsfActiveLane(polarisSDKContextManager, discoveryClient); + } + else { + return new PolarisActiveLane(polarisSDKContextManager, discoveryClient, registration); + } } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneProperties.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneProperties.java index 0f0e7a67b..e77ca93a7 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneProperties.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneProperties.java @@ -17,10 +17,12 @@ package com.tencent.cloud.plugin.mq.lane.kafka; +import com.tencent.cloud.plugin.mq.lane.MqLaneProperties; + import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "spring.cloud.polaris.lane.kafka") -public class KafkaLaneProperties { +public class KafkaLaneProperties implements MqLaneProperties { /** * enable kafka lane. diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLane.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLane.java index bbef55288..dd40ca145 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLane.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLane.java @@ -25,10 +25,11 @@ import java.util.Optional; import java.util.Set; import com.tencent.cloud.common.util.JacksonUtils; +import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane; +import com.tencent.cloud.plugin.mq.lane.LaneRuleListener; +import com.tencent.cloud.plugin.mq.lane.MqLaneProperties; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; -import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeCallback; -import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeListener; import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceKey; @@ -44,20 +45,22 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; -@ServiceInstanceChangeListener(serviceName = "${spring.application.name}") -public class TsfActiveLane implements ServiceInstanceChangeCallback, InitializingBean { +public class TsfActiveLane extends AbstractActiveLane implements InitializingBean { private static final Logger LOG = LoggerFactory.getLogger(TsfActiveLane.class); + private static final String TSF_LANE_ID = "tsf_laneId"; + private final PolarisSDKContextManager polarisSDKContextManager; private final PolarisDiscoveryHandler discoveryClient; + /** * Online deployment groups for this service (same namespace id and application id required). */ private volatile Set activeGroupSet = new HashSet<>(); - private volatile Set currentGroupLaneIds = null; + private volatile Set currentGroupLaneIds = new HashSet<>(); /** * key: laneId. * value: true - online, false - offline. @@ -81,7 +84,11 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin this.discoveryClient = discoveryClient; Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) .map(SDKContext::getExtensions).map(Extensions::getLocalRegistry) - .ifPresent(localRegistry -> localRegistry.registerResourceListener(new TsfLaneRuleListener(this))); + .ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus))); + + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) + .map(SDKContext::getExtensions).map(Extensions::getLocalRegistry) + .ifPresent(localRegistry -> localRegistry.registerResourceListener(this)); } @Override @@ -91,16 +98,16 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin } @Override - public void callback(List currentServiceInstances, List addServiceInstances, List deleteServiceInstances) { + public void callback(List currentServiceInstances) { if (LOG.isDebugEnabled()) { - LOG.debug("ConsulServiceChangeCallback currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances)); + LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances)); LOG.debug("current namespaceId: {}, groupId: {}, applicationId: {}", tsfNamespaceId, tsfGroupId, tsfApplicationId); } freshWhenInstancesChange(currentServiceInstances); if (LOG.isDebugEnabled()) { - LOG.info("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap)); + LOG.debug("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap)); } } @@ -116,7 +123,7 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin String nsId = healthService.getMetadata().get("TSF_NAMESPACE_ID"); String groupId = healthService.getMetadata().get("TSF_GROUP_ID"); String applicationId = healthService.getMetadata().get("TSF_APPLICATION_ID"); - if (tsfNamespaceId.equals(nsId) && tsfApplicationId.equals(applicationId) && StringUtils.isNotEmpty(groupId)) { + if (StringUtils.equals(tsfNamespaceId, nsId) && StringUtils.equals(tsfApplicationId, applicationId) && StringUtils.isNotEmpty(groupId)) { currentActiveGroupSet.add(groupId); } } @@ -183,4 +190,61 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin return currentGroupLaneIds; } + @Override + public boolean ifConsume(String originMessageLaneId, MqLaneProperties mqLaneProperties) { + String laneId = originMessageLaneId; + if (laneId != null && laneId.contains("/")) { + laneId = laneId.split("/")[1]; + } + + Set groupLaneIdSet = getCurrentGroupLaneIds(); + // message has no lane id + if (StringUtils.isEmpty(laneId)) { + if (groupLaneIdSet.isEmpty()) { + // baseline service, consume directly + return true; + } + else { + // lane listener consumes baseline message + return mqLaneProperties.getLaneConsumeMain(); + } + } + else { + LaneUtils.setCallerLaneId(originMessageLaneId); + + // message has lane id + if (groupLaneIdSet.isEmpty()) { + // baseline service + // message carries lane id but the current service's lane has no deployment groups, consume baseline + boolean consume = !isLaneExist(laneId); + + // message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume + consume = consume || + (mqLaneProperties.getMainConsumeLane() && + isLaneExist(laneId) && + !isActiveLane(laneId) + ); + return consume; + } + else { + return groupLaneIdSet.contains(laneId); + } + } + } + + @Override + public String getLaneHeaderKey() { + return TSF_LANE_ID; + } + + @Override + public String formatLaneId(String laneId) { + if (StringUtils.isEmpty(laneId)) { + return laneId; + } + if (!laneId.contains("/") && laneId.startsWith("lane-")) { + laneId = "tsf/" + laneId; + } + return laneId; + } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListenerTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListenerTest.java similarity index 84% rename from spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListenerTest.java rename to spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListenerTest.java index b8a3bbe41..e2f729721 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListenerTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListenerTest.java @@ -15,8 +15,9 @@ * specific language governing permissions and limitations under the License. */ -package com.tencent.cloud.plugin.mq.lane.tsf; +package com.tencent.cloud.plugin.mq.lane; +import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; import com.tencent.polaris.api.pojo.RegistryCacheValue; import com.tencent.polaris.api.pojo.ServiceEventKey; import org.junit.jupiter.api.BeforeEach; @@ -29,23 +30,23 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** - * Test for {@link TsfLaneRuleListener}. + * Test for {@link LaneRuleListener}. */ -public class TsfLaneRuleListenerTest { +public class LaneRuleListenerTest { private TsfActiveLane tsfActiveLane; - private TsfLaneRuleListener tsfLaneRuleListener; + private LaneRuleListener laneRuleListener; @BeforeEach public void setUp() { tsfActiveLane = mock(TsfActiveLane.class); - tsfLaneRuleListener = new TsfLaneRuleListener(tsfActiveLane); + laneRuleListener = new LaneRuleListener(tsfActiveLane::freshLaneStatus); } @Test public void testConstructorInitialization() { // Verify that constructor properly initializes the TsfActiveLane dependency - assert tsfLaneRuleListener != null; + assert laneRuleListener != null; } @Test @@ -57,7 +58,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE); // When - tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue); + laneRuleListener.onResourceAdd(svcEventKey, newValue); // Then verify(tsfActiveLane).freshLaneStatus(); @@ -72,7 +73,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.INSTANCE); // When - tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue); + laneRuleListener.onResourceAdd(svcEventKey, newValue); // Then verify(tsfActiveLane, never()).freshLaneStatus(); @@ -88,7 +89,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE); // When - tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue); + laneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue); // Then verify(tsfActiveLane).freshLaneStatus(); @@ -104,7 +105,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.SERVICE); // When - tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue); + laneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue); // Then verify(tsfActiveLane, never()).freshLaneStatus(); @@ -119,7 +120,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE); // When - tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue); + laneRuleListener.onResourceDeleted(svcEventKey, oldValue); // Then verify(tsfActiveLane).freshLaneStatus(); @@ -134,7 +135,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.ROUTING); // When - tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue); + laneRuleListener.onResourceDeleted(svcEventKey, oldValue); // Then verify(tsfActiveLane, never()).freshLaneStatus(); @@ -150,7 +151,7 @@ public class TsfLaneRuleListenerTest { when(svcEventKey.getEventType()).thenReturn(eventType); // When - tsfLaneRuleListener.onResourceAdd(svcEventKey, value); + laneRuleListener.onResourceAdd(svcEventKey, value); // Then if (eventType == ServiceEventKey.EventType.LANE_RULE) { diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java new file mode 100644 index 000000000..573daaa59 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java @@ -0,0 +1,181 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.common.util.JacksonUtils; +import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.api.pojo.Instance; +import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.plugins.router.lane.BaseLaneMode; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.cloud.client.serviceregistry.Registration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test for {@link PolarisActiveLane}. + * Instance and service status change. + */ +public class PolarisActiveLaneTest { + + private static final String CURRENT_INSTANCE_ID = "current-instance-id"; + + private KafkaLaneProperties kafkaLaneProperties; + private PolarisActiveLane polarisActiveLane; + private PolarisSDKContextManager polarisSDKContextManager; + private Registration registration; + private LaneProto.LaneGroup group; + private MockedStatic laneUtilsMockedStatic; + private MockedStatic jacksonUtilsMockedStatic; + private MockedStatic tsfContextUtilsMockedStatic; + + @BeforeEach + public void setUp() throws Exception { + group = mock(LaneProto.LaneGroup.class); + kafkaLaneProperties = new KafkaLaneProperties(); + kafkaLaneProperties.setLaneOn(true); + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + when(polarisSDKContextManager.getSDKContext()).thenReturn(mock(SDKContext.class)); + registration = mock(Registration.class); + when(registration.getInstanceId()).thenReturn(CURRENT_INSTANCE_ID); + polarisActiveLane = new PolarisActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), registration); + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class); + tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + + jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}"); + + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + jacksonUtilsMockedStatic.close(); + tsfContextUtilsMockedStatic.close(); + } + + @Test + public void testCallback() throws Throwable { + // not in lane + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + + // in lane + Map metadata = new HashMap<>(); + metadata.put("lane", "test-lane"); + + Instance instance = mock(Instance.class); + when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID); + when(instance.getMetadata()).thenReturn(metadata); + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())) + .thenReturn(Collections.singletonList(group)); + + polarisActiveLane.callback(Collections.singletonList(instance)); + + assertThat(polarisActiveLane.currentInstanceInLane()).isTrue(); + + // instance not in lane + metadata.remove("lane"); + polarisActiveLane.callback(Collections.singletonList(instance)); + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + + // reset, instance in lane + metadata.put("lane", "test-lane"); + polarisActiveLane.callback(Collections.singletonList(instance)); + assertThat(polarisActiveLane.currentInstanceInLane()).isTrue(); + + // service not in lane + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())).thenReturn(Collections.emptyList()); + polarisActiveLane.freshLaneStatus(); // rule listener will call this method + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + } + + @Test + public void testCallback_baseLaneMode1() throws Throwable { + Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode"); + baseLaneModeField.setAccessible(true); + baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE); + + // not in lane + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + + // not in lane + Map metadata = new HashMap<>(); + metadata.put("lane", "lane-not-exist"); + + Instance instance = mock(Instance.class); + when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID); + when(instance.getMetadata()).thenReturn(metadata); + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())) + .thenReturn(Collections.singletonList(group)); + + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(rule.getDefaultLabelValue()).thenReturn("lane-exist"); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + + polarisActiveLane.callback(Collections.singletonList(instance)); + + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + } + + @Test + public void testCallback_baseLaneMode2() throws Throwable { + Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode"); + baseLaneModeField.setAccessible(true); + baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE); + + // not in lane + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + + // in lane + Map metadata = new HashMap<>(); + metadata.put("lane", "lane-exist"); + + Instance instance = mock(Instance.class); + when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID); + when(instance.getMetadata()).thenReturn(metadata); + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())) + .thenReturn(Collections.singletonList(group)); + + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(rule.getDefaultLabelValue()).thenReturn("lane-exist"); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + + polarisActiveLane.callback(Collections.singletonList(instance)); + + assertThat(polarisActiveLane.currentInstanceInLane()).isTrue(); + + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfigurationTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfigurationTest.java index aaafe233e..735f27082 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfigurationTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectConfigurationTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.client.serviceregistry.Registration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -155,25 +156,6 @@ public class KafkaLaneAspectConfigurationTest { }); } - @Test - public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() { - resetTsfContextUtilsStaticFields(); - // Simulate Only TSF Consul enabled condition - contextRunner - .withPropertyValues( - "tsf_consul_enable=true", - "tsf_consul_ip=127.0.0.1", - "spring.cloud.polaris.address=" // Empty to simulate only TSF Consul - ) - .run(context -> { - // TsfActiveLane should be created when only TSF Consul is enabled - assertThat(context).hasSingleBean(TsfActiveLane.class); - - TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class); - assertThat(tsfActiveLane).isNotNull(); - }); - } - @Configuration static class MockConfiguration { @Bean @@ -185,5 +167,10 @@ public class KafkaLaneAspectConfigurationTest { public PolarisDiscoveryHandler polarisDiscoveryHandler() { return mock(PolarisDiscoveryHandler.class); } + + @Bean + public Registration registration() { + return mock(Registration.class); + } } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java index 49c0409bc..2f4cdacc8 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java @@ -20,14 +20,16 @@ package com.tencent.cloud.plugin.mq.lane.kafka; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import com.tencent.cloud.common.tsf.TsfContextUtils; -import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; +import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; @@ -38,6 +40,7 @@ import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.springframework.cloud.client.serviceregistry.Registration; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.RecordMessageConverter; @@ -51,64 +54,46 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** - * Test for {@link KafkaLaneAspect}. + * Test for {@link KafkaLaneAspect}. Instance in lane. */ public class KafkaLaneAspectTest { private KafkaLaneAspect kafkaLaneAspect; private PolarisSDKContextManager polarisSDKContextManager; private KafkaLaneProperties kafkaLaneProperties; - private TsfActiveLane tsfActiveLane; + private PolarisActiveLane polarisActiveLane; + private LaneProto.LaneGroup group; private MockedStatic laneUtilsMockedStatic; private MockedStatic tsfContextUtilsMockedStatic; @BeforeEach - public void setUp() { + public void setUp() throws Exception { polarisSDKContextManager = mock(PolarisSDKContextManager.class); + group = mock(LaneProto.LaneGroup.class); kafkaLaneProperties = new KafkaLaneProperties(); - tsfActiveLane = mock(TsfActiveLane.class); - + polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class)); laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); + laneField.setAccessible(true); + laneField.set(polarisActiveLane, "test-lane"); // in lane + + Field groupsField = PolarisActiveLane.class.getDeclaredField("groups"); + groupsField.setAccessible(true); + groupsField.set(polarisActiveLane, Collections.singletonList(group)); + + Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane"); + serviceInLaneField.setAccessible(true); + serviceInLaneField.setBoolean(polarisActiveLane, true); - kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); + + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane); } @AfterEach public void tearDown() { laneUtilsMockedStatic.close(); tsfContextUtilsMockedStatic.close(); - resetTsfContextUtilsStaticFields(); - } - - private void resetTsfContextUtilsStaticFields() { - try { - // Reset isOnlyTsfConsulEnabledFirstConfiguration - Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration"); - isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true); - AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null); - isOnlyTsfConsulEnabledFirstConfiguration.set(true); - - // Reset isTsfConsulEnabledFirstConfiguration - Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration"); - isTsfConsulEnabledFirstConfigurationField.setAccessible(true); - AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null); - isTsfConsulEnabledFirstConfiguration.set(true); - - // Reset tsfConsulEnabled - Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled"); - tsfConsulEnabledField.setAccessible(true); - tsfConsulEnabledField.setBoolean(null, false); - - // Reset onlyTsfConsulEnabled - Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled"); - onlyTsfConsulEnabledField.setAccessible(true); - onlyTsfConsulEnabledField.setBoolean(null, false); - - } - catch (Exception e) { - throw new RuntimeException("Failed to reset TsfContextUtils static fields", e); - } } @Test @@ -166,7 +151,7 @@ public class KafkaLaneAspectTest { kafkaLaneAspect.aroundProducerMessage(pjp); // Then - Iterator
headers = producerRecord.headers().headers("X-Polaris-Metadata-Transitive-service-lane").iterator(); + Iterator
headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator(); assertThat(headers.hasNext()).isTrue(); Header laneHeader = headers.next(); assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId); @@ -213,13 +198,21 @@ public class KafkaLaneAspectTest { } @Test - public void testConsumerAspectWithLaneHeader() throws Throwable { + public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable { // Given + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + when(rule.getGroupName()).thenReturn("test-group"); + when(rule.getName()).thenReturn("test-lane-name"); + when(rule.getDefaultLabelValue()).thenReturn("test-lane"); kafkaLaneProperties.setLaneOn(true); - String laneId = "test-lane-id"; + String laneId = "test-group/test-lane-name"; // valid lane id + + laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId); + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); - consumerRecord.headers().add("X-Polaris-Metadata-Transitive-service-lane", laneId.getBytes(StandardCharsets.UTF_8)); + consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); Object[] args = new Object[] {consumerRecord}; @@ -230,7 +223,7 @@ public class KafkaLaneAspectTest { Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); // Then - assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + assertThat(result).isEqualTo("result"); } @Test @@ -238,7 +231,8 @@ public class KafkaLaneAspectTest { // Given ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); String expectedLaneId = "test-lane-id"; - consumerRecord.headers().add("X-Polaris-Metadata-Transitive-service-lane", expectedLaneId.getBytes(StandardCharsets.UTF_8)); + consumerRecord.headers() + .add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8)); // When String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); @@ -258,29 +252,24 @@ public class KafkaLaneAspectTest { String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); // Then - assertThat(laneId).isEqualTo("tsf/" + expectedLaneId); + assertThat(laneId).isEqualTo(expectedLaneId); } @Test - public void testIfConsumeInPolarisWithNoLaneId() { + public void testIfConsumeWithNoLaneId() { // Given kafkaLaneProperties.setLaneConsumeMain(true); - - // Use reflection to set the lane field - try { - Field laneField = KafkaLaneAspect.class.getDeclaredField("lane"); - laneField.setAccessible(true); - laneField.set(kafkaLaneAspect, "test-lane"); - } - catch (Exception e) { - throw new RuntimeException(e); - } - // When boolean shouldConsume = kafkaLaneAspect.ifConsume(""); - // Then assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true + + // Given + kafkaLaneProperties.setLaneConsumeMain(false); + // When + shouldConsume = kafkaLaneAspect.ifConsume(""); + // Then + assertThat(shouldConsume).isFalse(); // Because laneConsumeMain is false } @Test @@ -290,9 +279,9 @@ public class KafkaLaneAspectTest { List messageList = new ArrayList<>(); ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1"); - record1.headers().add("X-Polaris-Metadata-Transitive-service-lane", "lane1".getBytes(StandardCharsets.UTF_8)); + record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8)); ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2"); - record2.headers().add("X-Polaris-Metadata-Transitive-service-lane", "lane2".getBytes(StandardCharsets.UTF_8)); + record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8)); messageList.add(record1); messageList.add(record2); diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java new file mode 100644 index 000000000..77b6cf332 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java @@ -0,0 +1,333 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane.kafka; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.cloud.client.serviceregistry.Registration; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for {@link KafkaLaneAspect}. Instance and service not in lane. + */ +public class KafkaLaneAspectTest2 { + + private KafkaLaneAspect kafkaLaneAspect; + private PolarisSDKContextManager polarisSDKContextManager; + private KafkaLaneProperties kafkaLaneProperties; + private PolarisActiveLane polarisActiveLane; + private LaneProto.LaneGroup group; + private MockedStatic laneUtilsMockedStatic; + private MockedStatic tsfContextUtilsMockedStatic; + + @BeforeEach + public void setUp() throws Exception { + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + group = mock(LaneProto.LaneGroup.class); + kafkaLaneProperties = new KafkaLaneProperties(); + polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class)); + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); + laneField.setAccessible(true); + laneField.set(polarisActiveLane, ""); // not in lane + + Field groupsField = PolarisActiveLane.class.getDeclaredField("groups"); + groupsField.setAccessible(true); + groupsField.set(polarisActiveLane, Collections.singletonList(group)); + + Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane"); + serviceInLaneField.setAccessible(true); + serviceInLaneField.setBoolean(polarisActiveLane, false); + + + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane); + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + tsfContextUtilsMockedStatic.close(); + } + + @Test + public void testProducerAspectWhenLaneDisabled() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(false); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ProducerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testProducerAspectWhenNoLaneId() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ProducerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testProducerAspectWithProducerRecord() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value"); + + when(pjp.getTarget()).thenReturn(kafkaTemplate); + when(pjp.getArgs()).thenReturn(new Object[] {producerRecord}); + when(kafkaTemplate.send(producerRecord)).thenReturn(null); + + // When + kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + Iterator
headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator(); + assertThat(headers.hasNext()).isTrue(); + Header laneHeader = headers.next(); + assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId); + } + + @Test + public void testProducerAspectWithMessage() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class); + Message message = MessageBuilder.withPayload("test-payload").build(); + ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value"); + + when(pjp.getTarget()).thenReturn(kafkaTemplate); + when(pjp.getArgs()).thenReturn(new Object[] {message}); + when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic"); + when(kafkaTemplate.send(producerRecord)).thenReturn(null); + when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter); + + // When + kafkaLaneAspect.aroundProducerMessage(pjp); + } + + @Test + public void testConsumerAspectWhenLaneDisabled() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(false); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ConsumerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable { + // Given + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + when(rule.getGroupName()).thenReturn("test-group"); + when(rule.getName()).thenReturn("test-lane-name"); + when(rule.getDefaultLabelValue()).thenReturn("test-lane"); + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-group/test-lane-name"; // valid lane id + + laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId); + + + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + + // change main consume lane + kafkaLaneProperties.setMainConsumeLane(true); + result = kafkaLaneAspect.aroundConsumerMessage(pjp); + assertThat(result).isEqualTo("result"); + + //reset + kafkaLaneProperties.setMainConsumeLane(false); + + } + + @Test + public void testGetConsumerRecordLaneIdFromHeader() { + // Given + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + String expectedLaneId = "test-lane-id"; + consumerRecord.headers() + .add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8)); + + // When + String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); + + // Then + assertThat(laneId).isEqualTo(expectedLaneId); + } + + @Test + public void testGetConsumerRecordLaneIdFromCallerLane() { + // Given + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + String expectedLaneId = "lane-test"; + laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId); + + // When + String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); + + // Then + assertThat(laneId).isEqualTo(expectedLaneId); + } + + @Test + public void testIfConsumeWithNoLaneId() { + // Given + kafkaLaneProperties.setLaneConsumeMain(true); + // When + boolean shouldConsume = kafkaLaneAspect.ifConsume(""); + // Then + assertThat(shouldConsume).isTrue(); // as baseline + + // Given + kafkaLaneProperties.setLaneConsumeMain(false); + // When + shouldConsume = kafkaLaneAspect.ifConsume(""); + // Then + assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain + } + + @Test + public void testConsumerAspectWithBatchMessages() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + + List messageList = new ArrayList<>(); + ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1"); + record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8)); + ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2"); + record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8)); + + messageList.add(record1); + messageList.add(record2); + + Acknowledgment acknowledgment = mock(Acknowledgment.class); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {messageList, acknowledgment}; + + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(any())).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(any()); + } + + @Test + public void testConsumerAspectWithEmptyBatch() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + + List emptyList = new ArrayList<>(); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {emptyList}; + + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java new file mode 100644 index 000000000..0438cb25d --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java @@ -0,0 +1,172 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane.kafka; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.cloud.client.serviceregistry.Registration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test for {@link KafkaLaneAspect}. + * Instance and service status change. + */ +public class KafkaLaneAspectTest3 { + + private KafkaLaneAspect kafkaLaneAspect; + private PolarisSDKContextManager polarisSDKContextManager; + private KafkaLaneProperties kafkaLaneProperties; + private PolarisActiveLane polarisActiveLane; + private LaneProto.LaneGroup group; + private MockedStatic laneUtilsMockedStatic; + private MockedStatic tsfContextUtilsMockedStatic; + + @BeforeEach + public void setUp() throws Exception { + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + group = mock(LaneProto.LaneGroup.class); + kafkaLaneProperties = new KafkaLaneProperties(); + kafkaLaneProperties.setLaneOn(true); + polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class)); + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + + + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane); + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + tsfContextUtilsMockedStatic.close(); + } + + @Test + public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable { + // not in lane + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); + laneField.setAccessible(true); + laneField.set(polarisActiveLane, ""); + + Field groupsField = PolarisActiveLane.class.getDeclaredField("groups"); + groupsField.setAccessible(true); + groupsField.set(polarisActiveLane, Collections.singletonList(group)); + + Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane"); + serviceInLaneField.setAccessible(true); + serviceInLaneField.setBoolean(polarisActiveLane, false); + + String laneId = "test-group/test-lane-name"; // valid lane id + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // act + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // verify + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume + + // in lane + + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + when(rule.getGroupName()).thenReturn("test-group"); + when(rule.getName()).thenReturn("test-lane-name"); + when(rule.getDefaultLabelValue()).thenReturn("test-lane"); + + laneField.set(polarisActiveLane, "test-lane"); + serviceInLaneField.setBoolean(polarisActiveLane, true); + + laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId); + + result = kafkaLaneAspect.aroundConsumerMessage(pjp); + assertThat(result).isEqualTo("result"); + } + + @Test + public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable { + // in lane + String laneId = "test-group/test-lane-name"; // valid lane id + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + when(rule.getGroupName()).thenReturn("test-group"); + when(rule.getName()).thenReturn("test-lane-name"); + when(rule.getDefaultLabelValue()).thenReturn("test-lane"); + laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId); + + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); + laneField.setAccessible(true); + laneField.set(polarisActiveLane, "test-lane"); + + Field groupsField = PolarisActiveLane.class.getDeclaredField("groups"); + groupsField.setAccessible(true); + groupsField.set(polarisActiveLane, Collections.singletonList(group)); + + Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane"); + serviceInLaneField.setAccessible(true); + serviceInLaneField.setBoolean(polarisActiveLane, true); + + + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // act + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // verify + assertThat(result).isEqualTo("result"); + + + // not in lane + when(group.getRulesList()).thenReturn(Collections.emptyList()); + laneField.set(polarisActiveLane, ""); + serviceInLaneField.setBoolean(polarisActiveLane, false); + + result = kafkaLaneAspect.aroundConsumerMessage(pjp); + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectConfigurationTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectConfigurationTest.java new file mode 100644 index 000000000..6e99d927f --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectConfigurationTest.java @@ -0,0 +1,142 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane.kafka; + +import java.lang.reflect.Field; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.client.serviceregistry.Registration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Test for {@link KafkaLaneAspectConfiguration}. + */ +public class TsfKafkaLaneAspectConfigurationTest { + + private ApplicationContextRunner contextRunner; + + @BeforeEach + public void setUp() throws Exception { + // Reset onlyTsfConsulEnabled + Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled"); + onlyTsfConsulEnabledField.setAccessible(true); + onlyTsfConsulEnabledField.setBoolean(null, true); + + contextRunner = new ApplicationContextRunner() + .withPropertyValues( + "tsf_consul_enable=true", + "tsf_consul_ip=127.0.0.1", + "spring.cloud.polaris.address=" // Empty to simulate only TSF Consul + ) + .withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class)) + .withUserConfiguration(MockConfiguration.class); + } + + @Test + public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() { + // Simulate KafkaTemplate class presence + contextRunner + .withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true") + .run(context -> { + // KafkaLaneAspect should be created when KafkaTemplate is present + assertThat(context).hasSingleBean(KafkaLaneAspect.class); + + KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class); + assertThat(aspect).isNotNull(); + }); + } + + @Test + public void testKafkaLanePropertiesEnabled() { + contextRunner + .withPropertyValues( + "spring.cloud.polaris.lane.kafka.lane-on=true", + "spring.cloud.polaris.lane.kafka.lane-consume-main=true", + "spring.cloud.polaris.lane.kafka.main-consume-lane=true" + ) + .run(context -> { + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + assertThat(properties.getLaneOn()).isTrue(); + assertThat(properties.getLaneConsumeMain()).isTrue(); + assertThat(properties.getMainConsumeLane()).isTrue(); + }); + } + + @Test + public void testBeanDependenciesInjection() { + contextRunner + .withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true") + .run(context -> { + // Verify that all required dependencies are properly injected + assertThat(context).hasSingleBean(KafkaLaneAspect.class); + + KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class); + + // The aspect should have all required dependencies + assertThat(aspect).isNotNull(); + + // Verify that KafkaLaneProperties is properly configured + KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class); + assertThat(properties).isNotNull(); + assertThat(properties.getLaneOn()).isTrue(); + }); + } + + @Test + public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() { + // Simulate Only TSF Consul enabled condition + contextRunner + .run(context -> { + // TsfActiveLane should be created when only TSF Consul is enabled + assertThat(context).hasSingleBean(TsfActiveLane.class); + + TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class); + assertThat(tsfActiveLane).isNotNull(); + }); + } + + @Configuration + static class MockConfiguration { + @Bean + public PolarisSDKContextManager polarisSDKContextManager() { + return mock(PolarisSDKContextManager.class); + } + + @Bean + public PolarisDiscoveryHandler polarisDiscoveryHandler() { + return mock(PolarisDiscoveryHandler.class); + } + + @Bean + public Registration registration() { + return mock(Registration.class); + } + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest.java index a870b825d..c993bf883 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest.java @@ -20,14 +20,16 @@ package com.tencent.cloud.plugin.mq.lane.kafka; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import com.tencent.cloud.common.tsf.TsfContextUtils; import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; import com.tencent.polaris.plugins.router.lane.LaneUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -52,7 +54,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** - * Test for {@link KafkaLaneAspect}, with only tsf consul enabled. + * Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance in lane. */ public class TsfKafkaLaneAspectTest { @@ -64,15 +66,19 @@ public class TsfKafkaLaneAspectTest { private MockedStatic tsfContextUtilsMockedStatic; @BeforeEach - public void setUp() { + public void setUp() throws Exception { polarisSDKContextManager = mock(PolarisSDKContextManager.class); kafkaLaneProperties = new KafkaLaneProperties(); - tsfActiveLane = mock(TsfActiveLane.class); - + tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class)); laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true); + // reset currentGroupLaneIds, instance in lane + Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds"); + currentGroupLaneIdsField.setAccessible(true); + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane1"))); + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); } @@ -80,37 +86,6 @@ public class TsfKafkaLaneAspectTest { public void tearDown() { laneUtilsMockedStatic.close(); tsfContextUtilsMockedStatic.close(); - resetTsfContextUtilsStaticFields(); - } - - private void resetTsfContextUtilsStaticFields() { - try { - // Reset isOnlyTsfConsulEnabledFirstConfiguration - Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration"); - isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true); - AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null); - isOnlyTsfConsulEnabledFirstConfiguration.set(true); - - // Reset isTsfConsulEnabledFirstConfiguration - Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration"); - isTsfConsulEnabledFirstConfigurationField.setAccessible(true); - AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null); - isTsfConsulEnabledFirstConfiguration.set(true); - - // Reset tsfConsulEnabled - Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled"); - tsfConsulEnabledField.setAccessible(true); - tsfConsulEnabledField.setBoolean(null, false); - - // Reset onlyTsfConsulEnabled - Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled"); - onlyTsfConsulEnabledField.setAccessible(true); - onlyTsfConsulEnabledField.setBoolean(null, false); - - } - catch (Exception e) { - throw new RuntimeException("Failed to reset TsfContextUtils static fields", e); - } } @Test @@ -168,7 +143,7 @@ public class TsfKafkaLaneAspectTest { kafkaLaneAspect.aroundProducerMessage(pjp); // Then - Iterator
headers = producerRecord.headers().headers("tsf_laneId").iterator(); + Iterator
headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator(); assertThat(headers.hasNext()).isTrue(); Header laneHeader = headers.next(); assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId); @@ -215,14 +190,16 @@ public class TsfKafkaLaneAspectTest { } @Test - public void testConsumerAspectWithLaneHeader() throws Throwable { + public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable { // Given kafkaLaneProperties.setLaneOn(true); - String laneId = "test-lane-id"; - when(tsfActiveLane.isLaneExist(laneId)).thenReturn(true); + Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds"); + currentGroupLaneIdsField.setAccessible(true); + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-test"))); + String laneId = "tsf/lane-test"; // valid lane id ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); - consumerRecord.headers().add("tsf_laneId", laneId.getBytes(StandardCharsets.UTF_8)); + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); Object[] args = new Object[] {consumerRecord}; @@ -233,7 +210,7 @@ public class TsfKafkaLaneAspectTest { Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); // Then - assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + assertThat(result).isEqualTo("result"); } @Test @@ -241,7 +218,7 @@ public class TsfKafkaLaneAspectTest { // Given ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); String expectedLaneId = "test-lane-id"; - consumerRecord.headers().add("tsf_laneId", expectedLaneId.getBytes(StandardCharsets.UTF_8)); + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8)); // When String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); @@ -265,38 +242,25 @@ public class TsfKafkaLaneAspectTest { } @Test - public void testIfConsumeInPolarisWithNoLaneId() { - // Given - kafkaLaneProperties.setLaneConsumeMain(true); + public void testIfConsumeWithNoLaneId() throws Exception { - // Use reflection to set the lane field - try { - Field laneField = KafkaLaneAspect.class.getDeclaredField("lane"); - laneField.setAccessible(true); - laneField.set(kafkaLaneAspect, "test-lane"); - } - catch (Exception e) { - throw new RuntimeException(e); - } + Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds"); + currentGroupLaneIdsField.setAccessible(true); + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Arrays.asList("lane1", "lane2"))); + // Given + kafkaLaneProperties.setLaneConsumeMain(true); // When boolean shouldConsume = kafkaLaneAspect.ifConsume(""); - // Then assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true - } - @Test - public void testIfConsumeInTsfWithNoLaneId() { // Given - kafkaLaneProperties.setLaneConsumeMain(true); - when(tsfActiveLane.getCurrentGroupLaneIds()).thenReturn(Collections.emptySet()); - + kafkaLaneProperties.setLaneConsumeMain(false); // When - boolean shouldConsume = kafkaLaneAspect.ifConsume(""); - + shouldConsume = kafkaLaneAspect.ifConsume(""); // Then - assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true + assertThat(shouldConsume).isFalse(); // Because laneConsumeMain is false } @Test @@ -306,9 +270,9 @@ public class TsfKafkaLaneAspectTest { List messageList = new ArrayList<>(); ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1"); - record1.headers().add("tsf_laneId", "lane1".getBytes(StandardCharsets.UTF_8)); + record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8)); ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2"); - record2.headers().add("tsf_laneId", "lane2".getBytes(StandardCharsets.UTF_8)); + record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8)); messageList.add(record1); messageList.add(record2); diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest2.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest2.java new file mode 100644 index 000000000..dbdfbcf42 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest2.java @@ -0,0 +1,347 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane.kafka; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance not in lane. + */ +public class TsfKafkaLaneAspectTest2 { + + private KafkaLaneAspect kafkaLaneAspect; + private PolarisSDKContextManager polarisSDKContextManager; + private KafkaLaneProperties kafkaLaneProperties; + private TsfActiveLane tsfActiveLane; + private MockedStatic laneUtilsMockedStatic; + private MockedStatic tsfContextUtilsMockedStatic; + + @BeforeEach + public void setUp() throws Exception { + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + kafkaLaneProperties = new KafkaLaneProperties(); + tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class)); + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true); + + // reset currentGroupLaneIds + Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds"); + currentGroupLaneIdsField.setAccessible(true); + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>()); + + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + tsfContextUtilsMockedStatic.close(); + } + + @Test + public void testProducerAspectWhenLaneDisabled() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(false); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ProducerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testProducerAspectWhenNoLaneId() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ProducerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testProducerAspectWithProducerRecord() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value"); + + when(pjp.getTarget()).thenReturn(kafkaTemplate); + when(pjp.getArgs()).thenReturn(new Object[] {producerRecord}); + when(kafkaTemplate.send(producerRecord)).thenReturn(null); + + // When + kafkaLaneAspect.aroundProducerMessage(pjp); + + // Then + Iterator
headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator(); + assertThat(headers.hasNext()).isTrue(); + Header laneHeader = headers.next(); + assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId); + } + + @Test + public void testProducerAspectWithMessage() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + String laneId = "test-lane-id"; + laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class); + RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class); + Message message = MessageBuilder.withPayload("test-payload").build(); + ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value"); + + when(pjp.getTarget()).thenReturn(kafkaTemplate); + when(pjp.getArgs()).thenReturn(new Object[] {message}); + when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic"); + when(kafkaTemplate.send(producerRecord)).thenReturn(null); + when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter); + + // When + kafkaLaneAspect.aroundProducerMessage(pjp); + } + + @Test + public void testConsumerAspectWhenLaneDisabled() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(false); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {mock(ConsumerRecord.class)}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } + + @Test + public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap"); + laneActiveMapField.setAccessible(true); + HashMap laneActiveMap = new HashMap<>(); + laneActiveMap.put("lane-test", true); + laneActiveMapField.set(tsfActiveLane, laneActiveMap); + String laneId = "tsf/lane-test"; // valid lane id + + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + } + + @Test + public void testConsumerAspectWithLaneHeader_LaneIdExist2() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap"); + laneActiveMapField.setAccessible(true); + HashMap laneActiveMap = new HashMap<>(); + laneActiveMap.put("lane-test", false); + laneActiveMapField.set(tsfActiveLane, laneActiveMap); + String laneId = "tsf/lane-test"; // valid lane id + + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + + kafkaLaneProperties.setMainConsumeLane(true); + result = kafkaLaneAspect.aroundConsumerMessage(pjp); + assertThat(result).isEqualTo("result"); // as mainConsumeLane is true + + // reset mainConsumeLane + kafkaLaneProperties.setMainConsumeLane(false); + } + + @Test + public void testGetConsumerRecordLaneIdFromHeader() { + // Given + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + String expectedLaneId = "test-lane-id"; + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8)); + + // When + String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); + + // Then + assertThat(laneId).isEqualTo(expectedLaneId); + } + + @Test + public void testGetConsumerRecordLaneIdFromCallerLane() { + // Given + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + String expectedLaneId = "lane-test"; + laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId); + + // When + String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord); + + // Then + assertThat(laneId).isEqualTo("tsf/" + expectedLaneId); + } + + /** + * instance not in lane, act as baseline service. + */ + @Test + public void testIfConsumeWithNoLaneId() throws Exception { + + // Given + kafkaLaneProperties.setLaneConsumeMain(true); + // When + boolean shouldConsume = kafkaLaneAspect.ifConsume(""); + // Then + assertThat(shouldConsume).isTrue(); // baseline service + + // Given + kafkaLaneProperties.setLaneConsumeMain(false); + // When + shouldConsume = kafkaLaneAspect.ifConsume(""); + // Then + assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain + } + + @Test + public void testConsumerAspectWithBatchMessages() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + + List messageList = new ArrayList<>(); + ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1"); + record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8)); + ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2"); + record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8)); + + messageList.add(record1); + messageList.add(record2); + + Acknowledgment acknowledgment = mock(Acknowledgment.class); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {messageList, acknowledgment}; + + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(any())).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(any()); + } + + @Test + public void testConsumerAspectWithEmptyBatch() throws Throwable { + // Given + kafkaLaneProperties.setLaneOn(true); + + List emptyList = new ArrayList<>(); + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {emptyList}; + + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // When + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // Then + assertThat(result).isEqualTo("result"); + verify(pjp).proceed(args); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest3.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest3.java new file mode 100644 index 000000000..1fef929e8 --- /dev/null +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/TsfKafkaLaneAspectTest3.java @@ -0,0 +1,149 @@ +/* + * Tencent is pleased to support the open source community by making spring-cloud-tencent available. + * + * Copyright (C) 2021 Tencent. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.cloud.plugin.mq.lane.kafka; + +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; + +import com.tencent.cloud.common.tsf.TsfContextUtils; +import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane; +import com.tencent.cloud.polaris.context.PolarisSDKContextManager; +import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.plugins.router.lane.LaneUtils; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.aspectj.lang.ProceedingJoinPoint; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test for {@link KafkaLaneAspect}. + * Instance and service status change. + */ +public class TsfKafkaLaneAspectTest3 { + + private KafkaLaneAspect kafkaLaneAspect; + private PolarisSDKContextManager polarisSDKContextManager; + private KafkaLaneProperties kafkaLaneProperties; + private TsfActiveLane tsfActiveLane; + private LaneProto.LaneGroup group; + private MockedStatic laneUtilsMockedStatic; + private MockedStatic tsfContextUtilsMockedStatic; + + @BeforeEach + public void setUp() throws Exception { + polarisSDKContextManager = mock(PolarisSDKContextManager.class); + group = mock(LaneProto.LaneGroup.class); + kafkaLaneProperties = new KafkaLaneProperties(); + kafkaLaneProperties.setLaneOn(true); + tsfActiveLane = new TsfActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class)); + laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); + + + kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane); + } + + @AfterEach + public void tearDown() { + laneUtilsMockedStatic.close(); + tsfContextUtilsMockedStatic.close(); + } + + @Test + public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable { + // not in lane + + Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap"); + laneActiveMapField.setAccessible(true); + HashMap laneActiveMap = new HashMap<>(); + laneActiveMap.put("lane-name", true); + laneActiveMapField.set(tsfActiveLane, laneActiveMap); + + String laneId = "tsf/lane-name"; // valid lane id + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // act + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // verify + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume + + // in lane + Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds"); + currentGroupLaneIdsField.setAccessible(true); + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name"))); + + result = kafkaLaneAspect.aroundConsumerMessage(pjp); + assertThat(result).isEqualTo("result"); + } + + @Test + public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable { + // in lane + String laneId = "tsf/lane-name"; // valid lane id + + Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds"); + currentGroupLaneIdsField.setAccessible(true); + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name"))); + + Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap"); + laneActiveMapField.setAccessible(true); + HashMap laneActiveMap = new HashMap<>(); + laneActiveMap.put("lane-name", true); + laneActiveMapField.set(tsfActiveLane, laneActiveMap); + + + ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value"); + consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8)); + + ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class); + Object[] args = new Object[] {consumerRecord}; + when(pjp.getArgs()).thenReturn(args); + when(pjp.proceed(args)).thenReturn("result"); + + // act + Object result = kafkaLaneAspect.aroundConsumerMessage(pjp); + + // verify + assertThat(result).isEqualTo("result"); + + + // not in lane + currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>()); + + result = kafkaLaneAspect.aroundConsumerMessage(pjp); + assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); + } +} diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLaneTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLaneTest.java index 0aeed6f31..c5d2f8511 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLaneTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfActiveLaneTest.java @@ -17,6 +17,7 @@ package com.tencent.cloud.plugin.mq.lane.tsf; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -26,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +import com.tencent.cloud.common.util.JacksonUtils; +import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; import com.tencent.polaris.api.plugin.compose.Extensions; @@ -46,8 +49,6 @@ import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -58,9 +59,11 @@ public class TsfActiveLaneTest { private TsfActiveLane tsfActiveLane; private PolarisSDKContextManager polarisSDKContextManager; private PolarisDiscoveryHandler discoveryClient; + private KafkaLaneProperties kafkaLaneProperties; private SDKContext sdkContext; private Extensions extensions; private MockedStatic laneUtilsMockedStatic; + private MockedStatic jacksonUtilsMockedStatic; @BeforeEach public void setUp() { @@ -68,14 +71,19 @@ public class TsfActiveLaneTest { discoveryClient = mock(PolarisDiscoveryHandler.class); sdkContext = mock(SDKContext.class); extensions = mock(Extensions.class); + kafkaLaneProperties = new KafkaLaneProperties(); laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); + jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class); + jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}"); + when(polarisSDKContextManager.getSDKContext()).thenReturn(sdkContext); when(sdkContext.getExtensions()).thenReturn(extensions); tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient); + // Set up field values using reflection ReflectionTestUtils.setField(tsfActiveLane, "tsfNamespaceId", "test-namespace"); ReflectionTestUtils.setField(tsfActiveLane, "tsfGroupId", "test-group"); @@ -86,25 +94,16 @@ public class TsfActiveLaneTest { @AfterEach public void tearDown() { laneUtilsMockedStatic.close(); - } - - @Test - public void testConstructorInitialization() { - // Verify that constructor properly initializes dependencies - assertThat(tsfActiveLane).isNotNull(); - verify(polarisSDKContextManager, times(1)).getSDKContext(); - verify(sdkContext, times(1)).getExtensions(); + jacksonUtilsMockedStatic.close(); } @Test public void testCallbackWithEmptyInstances() { // Given List currentInstances = Collections.emptyList(); - List addInstances = Collections.emptyList(); - List deleteInstances = Collections.emptyList(); // When - tsfActiveLane.callback(currentInstances, addInstances, deleteInstances); + tsfActiveLane.callback(currentInstances); // Then // Should not throw any exceptions and handle empty instances gracefully @@ -236,6 +235,68 @@ public class TsfActiveLaneTest { assertThat(activeGroupSet).contains("group1"); } + @Test + public void testCallback() throws Throwable { + Field tsfNamespaceIdField = TsfActiveLane.class.getDeclaredField("tsfNamespaceId"); + tsfNamespaceIdField.setAccessible(true); + tsfNamespaceIdField.set(tsfActiveLane, "ns1"); + + Field tsfGroupIdField = TsfActiveLane.class.getDeclaredField("tsfGroupId"); + tsfGroupIdField.setAccessible(true); + tsfGroupIdField.set(tsfActiveLane, "group1"); + + Field tsfApplicationIdField = TsfActiveLane.class.getDeclaredField("tsfApplicationId"); + tsfApplicationIdField.setAccessible(true); + tsfApplicationIdField.set(tsfActiveLane, "app1"); + + // not in lane + assertThat(tsfActiveLane.getCurrentGroupLaneIds()).isEmpty(); + + // in lane + // given + LaneProto.LaneGroup group = mock(LaneProto.LaneGroup.class); + LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class); + Map metadataMap = new HashMap<>(); + metadataMap.put("ns1,app1", TsfMetadataConstants.TSF_NAMESPACE_ID + "," + TsfMetadataConstants.TSF_APPLICATION_ID); + when(group.getMetadataMap()).thenReturn(metadataMap); + when(group.getRulesList()).thenReturn(Collections.singletonList(laneRule)); + when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID); + when(laneRule.getDefaultLabelValue()).thenReturn("group1"); + when(laneRule.getId()).thenReturn("lane1"); + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())) + .thenReturn(Collections.singletonList(group)); + + Map metadata = new HashMap<>(); + metadata.put("TSF_NAMESPACE_ID", "ns1"); + metadata.put("TSF_GROUP_ID", "group1"); + metadata.put("TSF_APPLICATION_ID", "app1"); + + Instance instance = mock(Instance.class); + when(instance.getMetadata()).thenReturn(metadata); + + // act + tsfActiveLane.callback(Collections.singletonList(instance)); + + assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isTrue(); + + // instance not in lane, change the rule + when(laneRule.getDefaultLabelValue()).thenReturn("group2"); + tsfActiveLane.freshLaneStatus(); // rule listener will call this method + assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isFalse(); + + // reset, instance in lane + when(laneRule.getDefaultLabelValue()).thenReturn("group1"); + tsfActiveLane.freshLaneStatus(); // rule listener will call this method + assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isTrue(); + + // service not in lane + metadataMap.clear(); + metadataMap.put("ns1,app2", TsfMetadataConstants.TSF_NAMESPACE_ID + "," + TsfMetadataConstants.TSF_APPLICATION_ID); + when(laneRule.getDefaultLabelValue()).thenReturn("group2"); + tsfActiveLane.freshLaneStatus(); // rule listener will call this method + assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isFalse(); + } + private Map createMetadata(String namespaceId, String groupId, String applicationId) { Map metadata = new HashMap<>(); metadata.put("TSF_NAMESPACE_ID", namespaceId);