fix: kafka lane support dynamic lane tag. (#1784)

pull/1792/head
shedfreewu 2 weeks ago
parent abc2605c0b
commit dad7036a2c

@ -21,3 +21,4 @@
- [fix: fix lane router property name.](https://github.com/Tencent/spring-cloud-tencent/pull/1789)
- [feat: support kafka lane.](https://github.com/Tencent/spring-cloud-tencent/pull/1765)
- [fix: ApplicationContextAwareUtils may not be ready in postProcessAfterInitialization.](https://github.com/Tencent/spring-cloud-tencent/pull/1779)
- [fix: kafka lane support dynamic lane tag.](https://github.com/Tencent/spring-cloud-tencent/pull/1784)

@ -0,0 +1,53 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import java.util.List;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
public abstract class AbstractActiveLane extends AbstractResourceEventListener {
public abstract boolean ifConsume(String laneId, MqLaneProperties mqLaneProperties);
public abstract String getLaneHeaderKey();
public String formatLaneId(String laneId) {
return laneId;
}
public abstract void callback(List<Instance> currentServiceInstances);
@Override
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE
&& newValue instanceof ServiceInstancesByProto
&& StringUtils.equals(svcEventKey.getService(), MetadataContext.LOCAL_SERVICE)) {
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
callback(newIns.getInstances());
}
}
}

@ -15,18 +15,18 @@
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.tsf;
package com.tencent.cloud.plugin.mq.lane;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
public class TsfLaneRuleListener extends AbstractResourceEventListener {
public class LaneRuleListener extends AbstractResourceEventListener {
private final TsfActiveLane tsfActiveLane;
private final Runnable refreshAction;
public TsfLaneRuleListener(TsfActiveLane tsfActiveLane) {
this.tsfActiveLane = tsfActiveLane;
public LaneRuleListener(Runnable refreshAction) {
this.refreshAction = refreshAction;
}
@Override
@ -34,8 +34,7 @@ public class TsfLaneRuleListener extends AbstractResourceEventListener {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
tsfActiveLane.freshLaneStatus();
refreshAction.run();
}
@Override
@ -43,8 +42,7 @@ public class TsfLaneRuleListener extends AbstractResourceEventListener {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
tsfActiveLane.freshLaneStatus();
refreshAction.run();
}
@Override
@ -52,7 +50,6 @@ public class TsfLaneRuleListener extends AbstractResourceEventListener {
if (svcEventKey.getEventType() != ServiceEventKey.EventType.LANE_RULE) {
return;
}
tsfActiveLane.freshLaneStatus();
refreshAction.run();
}
}

@ -0,0 +1,29 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
public interface MqLaneProperties {
Boolean getLaneOn();
Boolean getLaneConsumeMain();
Boolean getMainConsumeLane();
Boolean getAutoSetHeader();
}

@ -0,0 +1,210 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.consumer.ConsumerConfig;
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.BaseLaneMode;
import com.tencent.polaris.plugins.router.lane.LaneRouter;
import com.tencent.polaris.plugins.router.lane.LaneRouterConfig;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.serviceregistry.Registration;
public class PolarisActiveLane extends AbstractActiveLane implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(PolarisActiveLane.class);
private final PolarisSDKContextManager polarisSDKContextManager;
private final PolarisDiscoveryHandler discoveryClient;
@Value("${spring.application.name:}")
private String springApplicationName;
/**
* current instance lane tag, related to baseLaneMode.
*/
private volatile String instanceLaneTag = "";
private volatile boolean serviceInLane = false;
private volatile List<LaneProto.LaneGroup> groups;
private Registration registration;
private BaseLaneMode baseLaneMode = BaseLaneMode.ONLY_UNTAGGED_INSTANCE;
public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager,
PolarisDiscoveryHandler discoveryClient, Registration registration) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.discoveryClient = discoveryClient;
this.registration = registration;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus)));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(this));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getConfig).map(Configuration::getConsumer).map(ConsumerConfig::getServiceRouter).ifPresent(serviceRouterConfig -> {
LaneRouterConfig laneRouterConfig = serviceRouterConfig.getPluginConfig(ServiceRouterConfig.DEFAULT_ROUTER_LANE, LaneRouterConfig.class);
if (laneRouterConfig != null) {
baseLaneMode = laneRouterConfig.getBaseLaneMode();
}
});
}
@Override
public void afterPropertiesSet() {
// get instances to trigger callback when instances change
discoveryClient.getHealthyInstances(springApplicationName);
}
@Override
public void callback(List<Instance> currentServiceInstances) {
if (LOG.isDebugEnabled()) {
LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
}
freshWhenInstancesChange(currentServiceInstances);
if (LOG.isDebugEnabled()) {
LOG.debug("current instanceLaneTag:{}, serviceInLane: {}, baseLaneMode:{}", instanceLaneTag, serviceInLane, baseLaneMode);
}
}
private void freshWhenInstancesChange(List<Instance> currentServices) {
freshLaneStatus();
String tempInstanceLaneTag = "";
// get all active groups
if (CollectionUtils.isNotEmpty(currentServices)) {
for (Instance healthService : currentServices) {
if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) {
tempInstanceLaneTag = healthService.getMetadata().get("lane");
break;
}
}
}
if (BaseLaneMode.ONLY_UNTAGGED_INSTANCE.equals(baseLaneMode)) {
instanceLaneTag = tempInstanceLaneTag;
}
else {
// if baseLaneMode is EXCLUDE_ENABLED_LANE_INSTANCE, check if the instance lane tag is in the lane
boolean laneTagExist = false;
for (LaneProto.LaneGroup group : getGroups()) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(rule.getDefaultLabelValue(), tempInstanceLaneTag)) {
laneTagExist = true;
}
}
}
instanceLaneTag = laneTagExist ? tempInstanceLaneTag : "";
}
}
/**
* update lane status.
*/
public void freshLaneStatus() {
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
serviceInLane = CollectionUtils.isNotEmpty(groups);
}
public boolean currentInstanceInLane() {
return StringUtils.isNotEmpty(instanceLaneTag) && serviceInLane;
}
public String getInstanceLaneTag() {
return instanceLaneTag;
}
public List<LaneProto.LaneGroup> getGroups() {
return groups == null ? Collections.emptyList() : groups;
}
@Override
public boolean ifConsume(String messageLaneId, MqLaneProperties mqLaneProperties) {
// message has no lane id
if (StringUtils.isEmpty(messageLaneId)) {
if (!currentInstanceInLane()) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return mqLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(messageLaneId);
// message has lane id
if (!currentInstanceInLane()) {
// baseline service
return mqLaneProperties.getMainConsumeLane();
}
else {
// whether the message lane id is the same as the lane id of the listener
for (LaneProto.LaneGroup group : getGroups()) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule))
&& StringUtils.equals(rule.getDefaultLabelValue(), getInstanceLaneTag())) {
return true;
}
}
}
return false;
}
}
}
@Override
public String getLaneHeaderKey() {
return MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL;
}
}

@ -19,23 +19,16 @@ package com.tencent.cloud.plugin.mq.lane.kafka;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.metadata.MetadataContextHolder;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.LaneRouter;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
@ -47,7 +40,6 @@ import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
@ -64,24 +56,19 @@ public class KafkaLaneAspect {
*/
public static final Object EMPTY_OBJECT = new Object();
private static final String TSF_LANE_ID = "tsf_laneId";
private final PolarisSDKContextManager polarisSDKContextManager;
private final KafkaLaneProperties kafkaLaneProperties;
private final TsfActiveLane tsfActiveLane;
private final AbstractActiveLane activeLane;
private final String laneHeaderKey;
@Value("${spring.cloud.tencent.metadata.content.lane:}")
private String lane;
public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, TsfActiveLane tsfActiveLane) {
public KafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager, KafkaLaneProperties kafkaLaneProperties, AbstractActiveLane activeLane) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.kafkaLaneProperties = kafkaLaneProperties;
this.tsfActiveLane = tsfActiveLane;
laneHeaderKey = TsfContextUtils.isOnlyTsfConsulEnabled() ? TSF_LANE_ID : MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL;
this.activeLane = activeLane;
laneHeaderKey = activeLane.getLaneHeaderKey();
}
@Pointcut("execution(* org.springframework.kafka.core.KafkaTemplate.send(..))")
@ -263,11 +250,9 @@ public class KafkaLaneAspect {
// falls back to the laneId from the metadata context (set by the user's aspect) if the header is empty
if (StringUtils.isBlank(laneId)) {
laneId = LaneUtils.getCallerLaneId();
// format laneId
if (laneId != null && !laneId.contains("/") && laneId.startsWith("lane-")) {
laneId = "tsf/" + laneId;
}
}
// format laneId
laneId = activeLane.formatLaneId(laneId);
return laneId;
}
@ -277,91 +262,6 @@ public class KafkaLaneAspect {
* @return whether to consume.
*/
boolean ifConsume(String messageLaneId) {
if (TsfContextUtils.isOnlyTsfConsulEnabled() && tsfActiveLane != null) {
return ifConsumeInTsf(messageLaneId);
}
else {
return ifConsumeInPolaris(messageLaneId);
}
}
private boolean ifConsumeInTsf(String originMessageLaneId) {
String laneId = originMessageLaneId;
if (laneId != null && laneId.contains("/")) {
laneId = laneId.split("/")[1];
}
Set<String> groupLaneIdSet = tsfActiveLane.getCurrentGroupLaneIds();
// message has no lane id
if (StringUtils.isEmpty(laneId)) {
if (groupLaneIdSet.isEmpty()) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return this.kafkaLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(originMessageLaneId);
// message has lane id
if (groupLaneIdSet.isEmpty()) {
// baseline service
// message carries lane id but the current service's lane has no deployment groups, consume baseline
boolean consume = !tsfActiveLane.isLaneExist(laneId);
// message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume
consume = consume ||
(this.kafkaLaneProperties.getMainConsumeLane() &&
tsfActiveLane.isLaneExist(laneId) &&
!tsfActiveLane.isActiveLane(laneId)
);
return consume;
}
else {
return groupLaneIdSet.contains(laneId);
}
}
}
private boolean ifConsumeInPolaris(String messageLaneId) {
// message has no lane id
if (StringUtils.isEmpty(messageLaneId)) {
if (StringUtils.isEmpty(lane)) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return this.kafkaLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(messageLaneId);
// message has lane id
if (StringUtils.isEmpty(lane)) {
// baseline service
return this.kafkaLaneProperties.getMainConsumeLane();
}
else {
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
Collection<LaneProto.LaneGroup> groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
// whether the message lane id is the same as the lane id of the listener
for (LaneProto.LaneGroup group : groups) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule))
&& StringUtils.equals(rule.getDefaultLabelValue(), lane)) {
return true;
}
}
}
return false;
}
}
return activeLane.ifConsume(messageLaneId, kafkaLaneProperties);
}
}

@ -17,15 +17,17 @@
package com.tencent.cloud.plugin.mq.lane.kafka;
import com.tencent.cloud.common.tsf.ConditionalOnOnlyTsfConsulEnabled;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -36,15 +38,21 @@ public class KafkaLaneAspectConfiguration {
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
public KafkaLaneAspect kafkaLaneAspect(PolarisSDKContextManager polarisSDKContextManager,
KafkaLaneProperties kafkaLaneProperties, @Autowired(required = false) TsfActiveLane tsfActiveLane) {
return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
KafkaLaneProperties kafkaLaneProperties,
AbstractActiveLane activeLane) {
return new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, activeLane);
}
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
@ConditionalOnMissingBean
@ConditionalOnOnlyTsfConsulEnabled
public TsfActiveLane tsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) {
return new TsfActiveLane(polarisSDKContextManager, discoveryClient);
public AbstractActiveLane activeLane(PolarisSDKContextManager polarisSDKContextManager,
PolarisDiscoveryHandler discoveryClient, Registration registration) {
if (TsfContextUtils.isOnlyTsfConsulEnabled()) {
return new TsfActiveLane(polarisSDKContextManager, discoveryClient);
}
else {
return new PolarisActiveLane(polarisSDKContextManager, discoveryClient, registration);
}
}
}

@ -17,10 +17,12 @@
package com.tencent.cloud.plugin.mq.lane.kafka;
import com.tencent.cloud.plugin.mq.lane.MqLaneProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.polaris.lane.kafka")
public class KafkaLaneProperties {
public class KafkaLaneProperties implements MqLaneProperties {
/**
* enable kafka lane.

@ -25,10 +25,11 @@ import java.util.Optional;
import java.util.Set;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.plugin.mq.lane.LaneRuleListener;
import com.tencent.cloud.plugin.mq.lane.MqLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeCallback;
import com.tencent.cloud.polaris.discovery.refresh.ServiceInstanceChangeListener;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
@ -44,20 +45,22 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
@ServiceInstanceChangeListener(serviceName = "${spring.application.name}")
public class TsfActiveLane implements ServiceInstanceChangeCallback, InitializingBean {
public class TsfActiveLane extends AbstractActiveLane implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(TsfActiveLane.class);
private static final String TSF_LANE_ID = "tsf_laneId";
private final PolarisSDKContextManager polarisSDKContextManager;
private final PolarisDiscoveryHandler discoveryClient;
/**
* Online deployment groups for this service (same namespace id and application id required).
*/
private volatile Set<String> activeGroupSet = new HashSet<>();
private volatile Set<String> currentGroupLaneIds = null;
private volatile Set<String> currentGroupLaneIds = new HashSet<>();
/**
* key: laneId.
* value: true - online, false - offline.
@ -81,7 +84,11 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin
this.discoveryClient = discoveryClient;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new TsfLaneRuleListener(this)));
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus)));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(this));
}
@Override
@ -91,16 +98,16 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin
}
@Override
public void callback(List<Instance> currentServiceInstances, List<Instance> addServiceInstances, List<Instance> deleteServiceInstances) {
public void callback(List<Instance> currentServiceInstances) {
if (LOG.isDebugEnabled()) {
LOG.debug("ConsulServiceChangeCallback currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
LOG.debug("current namespaceId: {}, groupId: {}, applicationId: {}", tsfNamespaceId, tsfGroupId, tsfApplicationId);
}
freshWhenInstancesChange(currentServiceInstances);
if (LOG.isDebugEnabled()) {
LOG.info("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap));
LOG.debug("current lane active status: {}", JacksonUtils.serialize2Json(laneActiveMap));
}
}
@ -116,7 +123,7 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin
String nsId = healthService.getMetadata().get("TSF_NAMESPACE_ID");
String groupId = healthService.getMetadata().get("TSF_GROUP_ID");
String applicationId = healthService.getMetadata().get("TSF_APPLICATION_ID");
if (tsfNamespaceId.equals(nsId) && tsfApplicationId.equals(applicationId) && StringUtils.isNotEmpty(groupId)) {
if (StringUtils.equals(tsfNamespaceId, nsId) && StringUtils.equals(tsfApplicationId, applicationId) && StringUtils.isNotEmpty(groupId)) {
currentActiveGroupSet.add(groupId);
}
}
@ -183,4 +190,61 @@ public class TsfActiveLane implements ServiceInstanceChangeCallback, Initializin
return currentGroupLaneIds;
}
@Override
public boolean ifConsume(String originMessageLaneId, MqLaneProperties mqLaneProperties) {
String laneId = originMessageLaneId;
if (laneId != null && laneId.contains("/")) {
laneId = laneId.split("/")[1];
}
Set<String> groupLaneIdSet = getCurrentGroupLaneIds();
// message has no lane id
if (StringUtils.isEmpty(laneId)) {
if (groupLaneIdSet.isEmpty()) {
// baseline service, consume directly
return true;
}
else {
// lane listener consumes baseline message
return mqLaneProperties.getLaneConsumeMain();
}
}
else {
LaneUtils.setCallerLaneId(originMessageLaneId);
// message has lane id
if (groupLaneIdSet.isEmpty()) {
// baseline service
// message carries lane id but the current service's lane has no deployment groups, consume baseline
boolean consume = !isLaneExist(laneId);
// message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume
consume = consume ||
(mqLaneProperties.getMainConsumeLane() &&
isLaneExist(laneId) &&
!isActiveLane(laneId)
);
return consume;
}
else {
return groupLaneIdSet.contains(laneId);
}
}
}
@Override
public String getLaneHeaderKey() {
return TSF_LANE_ID;
}
@Override
public String formatLaneId(String laneId) {
if (StringUtils.isEmpty(laneId)) {
return laneId;
}
if (!laneId.contains("/") && laneId.startsWith("lane-")) {
laneId = "tsf/" + laneId;
}
return laneId;
}
}

@ -15,8 +15,9 @@
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.tsf;
package com.tencent.cloud.plugin.mq.lane;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import org.junit.jupiter.api.BeforeEach;
@ -29,23 +30,23 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link TsfLaneRuleListener}.
* Test for {@link LaneRuleListener}.
*/
public class TsfLaneRuleListenerTest {
public class LaneRuleListenerTest {
private TsfActiveLane tsfActiveLane;
private TsfLaneRuleListener tsfLaneRuleListener;
private LaneRuleListener laneRuleListener;
@BeforeEach
public void setUp() {
tsfActiveLane = mock(TsfActiveLane.class);
tsfLaneRuleListener = new TsfLaneRuleListener(tsfActiveLane);
laneRuleListener = new LaneRuleListener(tsfActiveLane::freshLaneStatus);
}
@Test
public void testConstructorInitialization() {
// Verify that constructor properly initializes the TsfActiveLane dependency
assert tsfLaneRuleListener != null;
assert laneRuleListener != null;
}
@Test
@ -57,7 +58,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue);
laneRuleListener.onResourceAdd(svcEventKey, newValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
@ -72,7 +73,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.INSTANCE);
// When
tsfLaneRuleListener.onResourceAdd(svcEventKey, newValue);
laneRuleListener.onResourceAdd(svcEventKey, newValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
@ -88,7 +89,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
laneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
@ -104,7 +105,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.SERVICE);
// When
tsfLaneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
laneRuleListener.onResourceUpdated(svcEventKey, oldValue, newValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
@ -119,7 +120,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.LANE_RULE);
// When
tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue);
laneRuleListener.onResourceDeleted(svcEventKey, oldValue);
// Then
verify(tsfActiveLane).freshLaneStatus();
@ -134,7 +135,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(ServiceEventKey.EventType.ROUTING);
// When
tsfLaneRuleListener.onResourceDeleted(svcEventKey, oldValue);
laneRuleListener.onResourceDeleted(svcEventKey, oldValue);
// Then
verify(tsfActiveLane, never()).freshLaneStatus();
@ -150,7 +151,7 @@ public class TsfLaneRuleListenerTest {
when(svcEventKey.getEventType()).thenReturn(eventType);
// When
tsfLaneRuleListener.onResourceAdd(svcEventKey, value);
laneRuleListener.onResourceAdd(svcEventKey, value);
// Then
if (eventType == ServiceEventKey.EventType.LANE_RULE) {

@ -0,0 +1,181 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.BaseLaneMode;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link PolarisActiveLane}.
* Instance and service status change.
*/
public class PolarisActiveLaneTest {
private static final String CURRENT_INSTANCE_ID = "current-instance-id";
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private PolarisSDKContextManager polarisSDKContextManager;
private Registration registration;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<JacksonUtils> jacksonUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
when(polarisSDKContextManager.getSDKContext()).thenReturn(mock(SDKContext.class));
registration = mock(Registration.class);
when(registration.getInstanceId()).thenReturn(CURRENT_INSTANCE_ID);
polarisActiveLane = new PolarisActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), registration);
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}");
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
jacksonUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testCallback() throws Throwable {
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "test-lane");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
// instance not in lane
metadata.remove("lane");
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// reset, instance in lane
metadata.put("lane", "test-lane");
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
// service not in lane
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())).thenReturn(Collections.emptyList());
polarisActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
}
@Test
public void testCallback_baseLaneMode1() throws Throwable {
Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode");
baseLaneModeField.setAccessible(true);
baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE);
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// not in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "lane-not-exist");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(rule.getDefaultLabelValue()).thenReturn("lane-exist");
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
}
@Test
public void testCallback_baseLaneMode2() throws Throwable {
Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode");
baseLaneModeField.setAccessible(true);
baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE);
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "lane-exist");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(rule.getDefaultLabelValue()).thenReturn("lane-exist");
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
}
}

@ -30,6 +30,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -155,25 +156,6 @@ public class KafkaLaneAspectConfigurationTest {
});
}
@Test
public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() {
resetTsfContextUtilsStaticFields();
// Simulate Only TSF Consul enabled condition
contextRunner
.withPropertyValues(
"tsf_consul_enable=true",
"tsf_consul_ip=127.0.0.1",
"spring.cloud.polaris.address=" // Empty to simulate only TSF Consul
)
.run(context -> {
// TsfActiveLane should be created when only TSF Consul is enabled
assertThat(context).hasSingleBean(TsfActiveLane.class);
TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class);
assertThat(tsfActiveLane).isNotNull();
});
}
@Configuration
static class MockConfiguration {
@Bean
@ -185,5 +167,10 @@ public class KafkaLaneAspectConfigurationTest {
public PolarisDiscoveryHandler polarisDiscoveryHandler() {
return mock(PolarisDiscoveryHandler.class);
}
@Bean
public Registration registration() {
return mock(Registration.class);
}
}
}

@ -20,14 +20,16 @@ package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
@ -38,6 +40,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
@ -51,64 +54,46 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}.
* Test for {@link KafkaLaneAspect}. Instance in lane.
*/
public class KafkaLaneAspectTest {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private PolarisActiveLane polarisActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() {
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = mock(TsfActiveLane.class);
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "test-lane"); // in lane
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, true);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
resetTsfContextUtilsStaticFields();
}
private void resetTsfContextUtilsStaticFields() {
try {
// Reset isOnlyTsfConsulEnabledFirstConfiguration
Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration");
isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null);
isOnlyTsfConsulEnabledFirstConfiguration.set(true);
// Reset isTsfConsulEnabledFirstConfiguration
Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration");
isTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null);
isTsfConsulEnabledFirstConfiguration.set(true);
// Reset tsfConsulEnabled
Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled");
tsfConsulEnabledField.setAccessible(true);
tsfConsulEnabledField.setBoolean(null, false);
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, false);
}
catch (Exception e) {
throw new RuntimeException("Failed to reset TsfContextUtils static fields", e);
}
}
@Test
@ -166,7 +151,7 @@ public class KafkaLaneAspectTest {
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers("X-Polaris-Metadata-Transitive-service-lane").iterator();
Iterator<Header> headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
@ -213,13 +198,21 @@ public class KafkaLaneAspectTest {
}
@Test
public void testConsumerAspectWithLaneHeader() throws Throwable {
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
String laneId = "test-group/test-lane-name"; // valid lane id
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add("X-Polaris-Metadata-Transitive-service-lane", laneId.getBytes(StandardCharsets.UTF_8));
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
@ -230,7 +223,7 @@ public class KafkaLaneAspectTest {
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
assertThat(result).isEqualTo("result");
}
@Test
@ -238,7 +231,8 @@ public class KafkaLaneAspectTest {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers().add("X-Polaris-Metadata-Transitive-service-lane", expectedLaneId.getBytes(StandardCharsets.UTF_8));
consumerRecord.headers()
.add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
@ -258,29 +252,24 @@ public class KafkaLaneAspectTest {
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo("tsf/" + expectedLaneId);
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testIfConsumeInPolarisWithNoLaneId() {
public void testIfConsumeWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// Use reflection to set the lane field
try {
Field laneField = KafkaLaneAspect.class.getDeclaredField("lane");
laneField.setAccessible(true);
laneField.set(kafkaLaneAspect, "test-lane");
}
catch (Exception e) {
throw new RuntimeException(e);
}
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isFalse(); // Because laneConsumeMain is false
}
@Test
@ -290,9 +279,9 @@ public class KafkaLaneAspectTest {
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add("X-Polaris-Metadata-Transitive-service-lane", "lane1".getBytes(StandardCharsets.UTF_8));
record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add("X-Polaris-Metadata-Transitive-service-lane", "lane2".getBytes(StandardCharsets.UTF_8));
record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);

@ -0,0 +1,333 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}. Instance and service not in lane.
*/
public class KafkaLaneAspectTest2 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, ""); // not in lane
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, false);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-group/test-lane-name"; // valid lane id
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
// change main consume lane
kafkaLaneProperties.setMainConsumeLane(true);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result");
//reset
kafkaLaneProperties.setMainConsumeLane(false);
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers()
.add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testIfConsumeWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // as baseline
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,172 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.cloud.client.serviceregistry.Registration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}.
* Instance and service status change.
*/
public class KafkaLaneAspectTest3 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private PolarisActiveLane polarisActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
// not in lane
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "");
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, false);
String laneId = "test-group/test-lane-name"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume
// in lane
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
laneField.set(polarisActiveLane, "test-lane");
serviceInLaneField.setBoolean(polarisActiveLane, true);
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result");
}
@Test
public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable {
// in lane
String laneId = "test-group/test-lane-name"; // valid lane id
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
when(rule.getGroupName()).thenReturn("test-group");
when(rule.getName()).thenReturn("test-lane-name");
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "test-lane");
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
groupsField.setAccessible(true);
groupsField.set(polarisActiveLane, Collections.singletonList(group));
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
serviceInLaneField.setAccessible(true);
serviceInLaneField.setBoolean(polarisActiveLane, true);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo("result");
// not in lane
when(group.getRulesList()).thenReturn(Collections.emptyList());
laneField.set(polarisActiveLane, "");
serviceInLaneField.setBoolean(polarisActiveLane, false);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
}

@ -0,0 +1,142 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Test for {@link KafkaLaneAspectConfiguration}.
*/
public class TsfKafkaLaneAspectConfigurationTest {
private ApplicationContextRunner contextRunner;
@BeforeEach
public void setUp() throws Exception {
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, true);
contextRunner = new ApplicationContextRunner()
.withPropertyValues(
"tsf_consul_enable=true",
"tsf_consul_ip=127.0.0.1",
"spring.cloud.polaris.address=" // Empty to simulate only TSF Consul
)
.withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class))
.withUserConfiguration(MockConfiguration.class);
}
@Test
public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() {
// Simulate KafkaTemplate class presence
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// KafkaLaneAspect should be created when KafkaTemplate is present
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
assertThat(aspect).isNotNull();
});
}
@Test
public void testKafkaLanePropertiesEnabled() {
contextRunner
.withPropertyValues(
"spring.cloud.polaris.lane.kafka.lane-on=true",
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
)
.run(context -> {
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties.getLaneOn()).isTrue();
assertThat(properties.getLaneConsumeMain()).isTrue();
assertThat(properties.getMainConsumeLane()).isTrue();
});
}
@Test
public void testBeanDependenciesInjection() {
contextRunner
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
.run(context -> {
// Verify that all required dependencies are properly injected
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
// The aspect should have all required dependencies
assertThat(aspect).isNotNull();
// Verify that KafkaLaneProperties is properly configured
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
assertThat(properties).isNotNull();
assertThat(properties.getLaneOn()).isTrue();
});
}
@Test
public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() {
// Simulate Only TSF Consul enabled condition
contextRunner
.run(context -> {
// TsfActiveLane should be created when only TSF Consul is enabled
assertThat(context).hasSingleBean(TsfActiveLane.class);
TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class);
assertThat(tsfActiveLane).isNotNull();
});
}
@Configuration
static class MockConfiguration {
@Bean
public PolarisSDKContextManager polarisSDKContextManager() {
return mock(PolarisSDKContextManager.class);
}
@Bean
public PolarisDiscoveryHandler polarisDiscoveryHandler() {
return mock(PolarisDiscoveryHandler.class);
}
@Bean
public Registration registration() {
return mock(Registration.class);
}
}
}

@ -20,14 +20,16 @@ package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -52,7 +54,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}, with only tsf consul enabled.
* Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance in lane.
*/
public class TsfKafkaLaneAspectTest {
@ -64,15 +66,19 @@ public class TsfKafkaLaneAspectTest {
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() {
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = mock(TsfActiveLane.class);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);
// reset currentGroupLaneIds, instance in lane
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane1")));
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@ -80,37 +86,6 @@ public class TsfKafkaLaneAspectTest {
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
resetTsfContextUtilsStaticFields();
}
private void resetTsfContextUtilsStaticFields() {
try {
// Reset isOnlyTsfConsulEnabledFirstConfiguration
Field isOnlyTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isOnlyTsfConsulEnabledFirstConfiguration");
isOnlyTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isOnlyTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isOnlyTsfConsulEnabledFirstConfigurationField.get(null);
isOnlyTsfConsulEnabledFirstConfiguration.set(true);
// Reset isTsfConsulEnabledFirstConfiguration
Field isTsfConsulEnabledFirstConfigurationField = TsfContextUtils.class.getDeclaredField("isTsfConsulEnabledFirstConfiguration");
isTsfConsulEnabledFirstConfigurationField.setAccessible(true);
AtomicBoolean isTsfConsulEnabledFirstConfiguration = (AtomicBoolean) isTsfConsulEnabledFirstConfigurationField.get(null);
isTsfConsulEnabledFirstConfiguration.set(true);
// Reset tsfConsulEnabled
Field tsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("tsfConsulEnabled");
tsfConsulEnabledField.setAccessible(true);
tsfConsulEnabledField.setBoolean(null, false);
// Reset onlyTsfConsulEnabled
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
onlyTsfConsulEnabledField.setAccessible(true);
onlyTsfConsulEnabledField.setBoolean(null, false);
}
catch (Exception e) {
throw new RuntimeException("Failed to reset TsfContextUtils static fields", e);
}
}
@Test
@ -168,7 +143,7 @@ public class TsfKafkaLaneAspectTest {
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers("tsf_laneId").iterator();
Iterator<Header> headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
@ -215,14 +190,16 @@ public class TsfKafkaLaneAspectTest {
}
@Test
public void testConsumerAspectWithLaneHeader() throws Throwable {
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
when(tsfActiveLane.isLaneExist(laneId)).thenReturn(true);
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-test")));
String laneId = "tsf/lane-test"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add("tsf_laneId", laneId.getBytes(StandardCharsets.UTF_8));
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
@ -233,7 +210,7 @@ public class TsfKafkaLaneAspectTest {
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
assertThat(result).isEqualTo("result");
}
@Test
@ -241,7 +218,7 @@ public class TsfKafkaLaneAspectTest {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers().add("tsf_laneId", expectedLaneId.getBytes(StandardCharsets.UTF_8));
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
@ -265,38 +242,25 @@ public class TsfKafkaLaneAspectTest {
}
@Test
public void testIfConsumeInPolarisWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
public void testIfConsumeWithNoLaneId() throws Exception {
// Use reflection to set the lane field
try {
Field laneField = KafkaLaneAspect.class.getDeclaredField("lane");
laneField.setAccessible(true);
laneField.set(kafkaLaneAspect, "test-lane");
}
catch (Exception e) {
throw new RuntimeException(e);
}
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Arrays.asList("lane1", "lane2")));
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
}
@Test
public void testIfConsumeInTsfWithNoLaneId() {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
when(tsfActiveLane.getCurrentGroupLaneIds()).thenReturn(Collections.emptySet());
kafkaLaneProperties.setLaneConsumeMain(false);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // Because laneConsumeMain is true
assertThat(shouldConsume).isFalse(); // Because laneConsumeMain is false
}
@Test
@ -306,9 +270,9 @@ public class TsfKafkaLaneAspectTest {
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add("tsf_laneId", "lane1".getBytes(StandardCharsets.UTF_8));
record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add("tsf_laneId", "lane2".getBytes(StandardCharsets.UTF_8));
record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);

@ -0,0 +1,347 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance not in lane.
*/
public class TsfKafkaLaneAspectTest2 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);
// reset currentGroupLaneIds
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>());
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testProducerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWhenNoLaneId() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ProducerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testProducerAspectWithProducerRecord() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
// Then
Iterator<Header> headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator();
assertThat(headers.hasNext()).isTrue();
Header laneHeader = headers.next();
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
}
@Test
public void testProducerAspectWithMessage() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
String laneId = "test-lane-id";
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
Message message = MessageBuilder.withPayload("test-payload").build();
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
when(pjp.getTarget()).thenReturn(kafkaTemplate);
when(pjp.getArgs()).thenReturn(new Object[] {message});
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
// When
kafkaLaneAspect.aroundProducerMessage(pjp);
}
@Test
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(false);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {mock(ConsumerRecord.class)};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-test", true);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
String laneId = "tsf/lane-test"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
@Test
public void testConsumerAspectWithLaneHeader_LaneIdExist2() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-test", false);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
String laneId = "tsf/lane-test"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
kafkaLaneProperties.setMainConsumeLane(true);
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result"); // as mainConsumeLane is true
// reset mainConsumeLane
kafkaLaneProperties.setMainConsumeLane(false);
}
@Test
public void testGetConsumerRecordLaneIdFromHeader() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "test-lane-id";
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo(expectedLaneId);
}
@Test
public void testGetConsumerRecordLaneIdFromCallerLane() {
// Given
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
String expectedLaneId = "lane-test";
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
// When
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
// Then
assertThat(laneId).isEqualTo("tsf/" + expectedLaneId);
}
/**
* instance not in lane, act as baseline service.
*/
@Test
public void testIfConsumeWithNoLaneId() throws Exception {
// Given
kafkaLaneProperties.setLaneConsumeMain(true);
// When
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // baseline service
// Given
kafkaLaneProperties.setLaneConsumeMain(false);
// When
shouldConsume = kafkaLaneAspect.ifConsume("");
// Then
assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain
}
@Test
public void testConsumerAspectWithBatchMessages() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> messageList = new ArrayList<>();
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
messageList.add(record1);
messageList.add(record2);
Acknowledgment acknowledgment = mock(Acknowledgment.class);
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {messageList, acknowledgment};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(any())).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(any());
}
@Test
public void testConsumerAspectWithEmptyBatch() throws Throwable {
// Given
kafkaLaneProperties.setLaneOn(true);
List<ConsumerRecord> emptyList = new ArrayList<>();
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {emptyList};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// When
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// Then
assertThat(result).isEqualTo("result");
verify(pjp).proceed(args);
}
}

@ -0,0 +1,149 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane.kafka;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import com.tencent.cloud.common.tsf.TsfContextUtils;
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Test for {@link KafkaLaneAspect}.
* Instance and service status change.
*/
public class TsfKafkaLaneAspectTest3 {
private KafkaLaneAspect kafkaLaneAspect;
private PolarisSDKContextManager polarisSDKContextManager;
private KafkaLaneProperties kafkaLaneProperties;
private TsfActiveLane tsfActiveLane;
private LaneProto.LaneGroup group;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
@BeforeEach
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
tsfActiveLane = new TsfActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
}
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
tsfContextUtilsMockedStatic.close();
}
@Test
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
// not in lane
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-name", true);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
String laneId = "tsf/lane-name"; // valid lane id
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume
// in lane
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name")));
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo("result");
}
@Test
public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable {
// in lane
String laneId = "tsf/lane-name"; // valid lane id
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
currentGroupLaneIdsField.setAccessible(true);
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name")));
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
laneActiveMapField.setAccessible(true);
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
laneActiveMap.put("lane-name", true);
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
Object[] args = new Object[] {consumerRecord};
when(pjp.getArgs()).thenReturn(args);
when(pjp.proceed(args)).thenReturn("result");
// act
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
// verify
assertThat(result).isEqualTo("result");
// not in lane
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>());
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
}
}

@ -17,6 +17,7 @@
package com.tencent.cloud.plugin.mq.lane.tsf;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -26,6 +27,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.plugin.compose.Extensions;
@ -46,8 +49,6 @@ import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@ -58,9 +59,11 @@ public class TsfActiveLaneTest {
private TsfActiveLane tsfActiveLane;
private PolarisSDKContextManager polarisSDKContextManager;
private PolarisDiscoveryHandler discoveryClient;
private KafkaLaneProperties kafkaLaneProperties;
private SDKContext sdkContext;
private Extensions extensions;
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
private MockedStatic<JacksonUtils> jacksonUtilsMockedStatic;
@BeforeEach
public void setUp() {
@ -68,14 +71,19 @@ public class TsfActiveLaneTest {
discoveryClient = mock(PolarisDiscoveryHandler.class);
sdkContext = mock(SDKContext.class);
extensions = mock(Extensions.class);
kafkaLaneProperties = new KafkaLaneProperties();
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class);
jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}");
when(polarisSDKContextManager.getSDKContext()).thenReturn(sdkContext);
when(sdkContext.getExtensions()).thenReturn(extensions);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient);
// Set up field values using reflection
ReflectionTestUtils.setField(tsfActiveLane, "tsfNamespaceId", "test-namespace");
ReflectionTestUtils.setField(tsfActiveLane, "tsfGroupId", "test-group");
@ -86,25 +94,16 @@ public class TsfActiveLaneTest {
@AfterEach
public void tearDown() {
laneUtilsMockedStatic.close();
}
@Test
public void testConstructorInitialization() {
// Verify that constructor properly initializes dependencies
assertThat(tsfActiveLane).isNotNull();
verify(polarisSDKContextManager, times(1)).getSDKContext();
verify(sdkContext, times(1)).getExtensions();
jacksonUtilsMockedStatic.close();
}
@Test
public void testCallbackWithEmptyInstances() {
// Given
List<Instance> currentInstances = Collections.emptyList();
List<Instance> addInstances = Collections.emptyList();
List<Instance> deleteInstances = Collections.emptyList();
// When
tsfActiveLane.callback(currentInstances, addInstances, deleteInstances);
tsfActiveLane.callback(currentInstances);
// Then
// Should not throw any exceptions and handle empty instances gracefully
@ -236,6 +235,68 @@ public class TsfActiveLaneTest {
assertThat(activeGroupSet).contains("group1");
}
@Test
public void testCallback() throws Throwable {
Field tsfNamespaceIdField = TsfActiveLane.class.getDeclaredField("tsfNamespaceId");
tsfNamespaceIdField.setAccessible(true);
tsfNamespaceIdField.set(tsfActiveLane, "ns1");
Field tsfGroupIdField = TsfActiveLane.class.getDeclaredField("tsfGroupId");
tsfGroupIdField.setAccessible(true);
tsfGroupIdField.set(tsfActiveLane, "group1");
Field tsfApplicationIdField = TsfActiveLane.class.getDeclaredField("tsfApplicationId");
tsfApplicationIdField.setAccessible(true);
tsfApplicationIdField.set(tsfActiveLane, "app1");
// not in lane
assertThat(tsfActiveLane.getCurrentGroupLaneIds()).isEmpty();
// in lane
// given
LaneProto.LaneGroup group = mock(LaneProto.LaneGroup.class);
LaneProto.LaneRule laneRule = mock(LaneProto.LaneRule.class);
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put("ns1,app1", TsfMetadataConstants.TSF_NAMESPACE_ID + "," + TsfMetadataConstants.TSF_APPLICATION_ID);
when(group.getMetadataMap()).thenReturn(metadataMap);
when(group.getRulesList()).thenReturn(Collections.singletonList(laneRule));
when(laneRule.getLabelKey()).thenReturn(TsfMetadataConstants.TSF_GROUP_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("group1");
when(laneRule.getId()).thenReturn("lane1");
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
Map<String, String> metadata = new HashMap<>();
metadata.put("TSF_NAMESPACE_ID", "ns1");
metadata.put("TSF_GROUP_ID", "group1");
metadata.put("TSF_APPLICATION_ID", "app1");
Instance instance = mock(Instance.class);
when(instance.getMetadata()).thenReturn(metadata);
// act
tsfActiveLane.callback(Collections.singletonList(instance));
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isTrue();
// instance not in lane, change the rule
when(laneRule.getDefaultLabelValue()).thenReturn("group2");
tsfActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isFalse();
// reset, instance in lane
when(laneRule.getDefaultLabelValue()).thenReturn("group1");
tsfActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isTrue();
// service not in lane
metadataMap.clear();
metadataMap.put("ns1,app2", TsfMetadataConstants.TSF_NAMESPACE_ID + "," + TsfMetadataConstants.TSF_APPLICATION_ID);
when(laneRule.getDefaultLabelValue()).thenReturn("group2");
tsfActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(tsfActiveLane.getCurrentGroupLaneIds().contains("lane1")).isFalse();
}
private Map<String, String> createMetadata(String namespaceId, String groupId, String applicationId) {
Map<String, String> metadata = new HashMap<>();
metadata.put("TSF_NAMESPACE_ID", namespaceId);

Loading…
Cancel
Save