diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java index dd4214c69..501288a2c 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLane.java @@ -26,13 +26,18 @@ import com.tencent.cloud.common.util.JacksonUtils; import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties; import com.tencent.cloud.polaris.context.PolarisSDKContextManager; import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; +import com.tencent.polaris.api.config.Configuration; +import com.tencent.polaris.api.config.consumer.ConsumerConfig; +import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; import com.tencent.polaris.api.plugin.compose.Extensions; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.api.pojo.ServiceKey; import com.tencent.polaris.api.utils.CollectionUtils; import com.tencent.polaris.api.utils.StringUtils; import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.plugins.router.lane.BaseLaneMode; import com.tencent.polaris.plugins.router.lane.LaneRouter; +import com.tencent.polaris.plugins.router.lane.LaneRouterConfig; import com.tencent.polaris.plugins.router.lane.LaneUtils; import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; import org.slf4j.Logger; @@ -55,8 +60,10 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin @Value("${spring.application.name:}") private String springApplicationName; - - private volatile String lane = ""; + /** + * current instance lane tag, related to baseLaneMode. + */ + private volatile String instanceLaneTag = ""; private volatile boolean serviceInLane = false; @@ -64,6 +71,8 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin private Registration registration; + private BaseLaneMode baseLaneMode = BaseLaneMode.ONLY_UNTAGGED_INSTANCE; + public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient, KafkaLaneProperties kafkaLaneProperties, Registration registration) { this.polarisSDKContextManager = polarisSDKContextManager; @@ -77,6 +86,14 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) .map(SDKContext::getExtensions).map(Extensions::getLocalRegistry) .ifPresent(localRegistry -> localRegistry.registerResourceListener(this)); + + Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext) + .map(SDKContext::getConfig).map(Configuration::getConsumer).map(ConsumerConfig::getServiceRouter).ifPresent(serviceRouterConfig -> { + LaneRouterConfig laneRouterConfig = serviceRouterConfig.getPluginConfig(ServiceRouterConfig.DEFAULT_ROUTER_LANE, LaneRouterConfig.class); + if (laneRouterConfig != null) { + baseLaneMode = laneRouterConfig.getBaseLaneMode(); + } + }); } @Override @@ -94,24 +111,39 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin freshWhenInstancesChange(currentServiceInstances); if (LOG.isDebugEnabled()) { - LOG.debug("current lane:{}, serviceInLane: {}", lane, serviceInLane); + LOG.debug("current instanceLaneTag:{}, serviceInLane: {}, baseLaneMode:{}", instanceLaneTag, serviceInLane, baseLaneMode); } } private void freshWhenInstancesChange(List currentServices) { - if (currentServices == null || currentServices.isEmpty()) { - return; - } + freshLaneStatus(); + String tempInstanceLaneTag = ""; // get all active groups - for (Instance healthService : currentServices) { - if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) { - this.lane = healthService.getMetadata().get("lane"); - break; + if (CollectionUtils.isNotEmpty(currentServices)) { + for (Instance healthService : currentServices) { + if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) { + tempInstanceLaneTag = healthService.getMetadata().get("lane"); + break; + } } } - freshLaneStatus(); + if (BaseLaneMode.ONLY_UNTAGGED_INSTANCE.equals(baseLaneMode)) { + instanceLaneTag = tempInstanceLaneTag; + } + else { + // if baseLaneMode is EXCLUDE_ENABLED_LANE_INSTANCE, check if the instance lane tag is in the lane + boolean laneTagExist = false; + for (LaneProto.LaneGroup group : getGroups()) { + for (LaneProto.LaneRule rule : group.getRulesList()) { + if (StringUtils.equals(rule.getDefaultLabelValue(), tempInstanceLaneTag)) { + laneTagExist = true; + } + } + } + instanceLaneTag = laneTagExist ? tempInstanceLaneTag : ""; + } } /** @@ -128,11 +160,11 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin } public boolean currentInstanceInLane() { - return StringUtils.isNotEmpty(lane) && serviceInLane; + return StringUtils.isNotEmpty(instanceLaneTag) && serviceInLane; } - public String getLane() { - return lane; + public String getInstanceLaneTag() { + return instanceLaneTag; } public List getGroups() { @@ -165,7 +197,7 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin for (LaneProto.LaneGroup group : getGroups()) { for (LaneProto.LaneRule rule : group.getRulesList()) { if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule)) - && StringUtils.equals(rule.getDefaultLabelValue(), getLane())) { + && StringUtils.equals(rule.getDefaultLabelValue(), getInstanceLaneTag())) { return true; } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java index 35f70a811..2c8866e64 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/PolarisActiveLaneTest.java @@ -17,6 +17,7 @@ package com.tencent.cloud.plugin.mq.lane; +import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -28,6 +29,7 @@ import com.tencent.cloud.polaris.context.PolarisSDKContextManager; import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler; import com.tencent.polaris.api.pojo.Instance; import com.tencent.polaris.client.api.SDKContext; +import com.tencent.polaris.plugins.router.lane.BaseLaneMode; import com.tencent.polaris.plugins.router.lane.LaneUtils; import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; import org.junit.jupiter.api.AfterEach; @@ -119,4 +121,60 @@ public class PolarisActiveLaneTest { polarisActiveLane.freshLaneStatus(); // rule listener will call this method assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); } + + @Test + public void testCallback_baseLaneMode1() throws Throwable { + Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode"); + baseLaneModeField.setAccessible(true); + baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE); + + // not in lane + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + + // not in lane + Map metadata = new HashMap<>(); + metadata.put("lane", "lane-not-exist"); + + Instance instance = mock(Instance.class); + when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID); + when(instance.getMetadata()).thenReturn(metadata); + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())) + .thenReturn(Collections.singletonList(group)); + + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(rule.getDefaultLabelValue()).thenReturn("lane-exist"); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + + polarisActiveLane.callback(Collections.singletonList(instance)); + + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + } + + @Test + public void testCallback_baseLaneMode2() throws Throwable { + Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode"); + baseLaneModeField.setAccessible(true); + baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE); + + // not in lane + assertThat(polarisActiveLane.currentInstanceInLane()).isFalse(); + + // in lane + Map metadata = new HashMap<>(); + metadata.put("lane", "lane-exist"); + + Instance instance = mock(Instance.class); + when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID); + when(instance.getMetadata()).thenReturn(metadata); + laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())) + .thenReturn(Collections.singletonList(group)); + + LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class); + when(rule.getDefaultLabelValue()).thenReturn("lane-exist"); + when(group.getRulesList()).thenReturn(Collections.singletonList(rule)); + + polarisActiveLane.callback(Collections.singletonList(instance)); + + assertThat(polarisActiveLane.currentInstanceInLane()).isTrue(); + } } diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java index b21efe0c3..3df412c00 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest.java @@ -74,7 +74,7 @@ public class KafkaLaneAspectTest { polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class)); laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); - Field laneField = PolarisActiveLane.class.getDeclaredField("lane"); + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); laneField.setAccessible(true); laneField.set(polarisActiveLane, "test-lane"); // in lane diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java index fd775d144..e09fc0503 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest2.java @@ -75,7 +75,7 @@ public class KafkaLaneAspectTest2 { laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class); tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class); - Field laneField = PolarisActiveLane.class.getDeclaredField("lane"); + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); laneField.setAccessible(true); laneField.set(polarisActiveLane, ""); // not in lane diff --git a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java index e9659a4f3..431846770 100644 --- a/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java +++ b/spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/kafka/KafkaLaneAspectTest3.java @@ -78,7 +78,7 @@ public class KafkaLaneAspectTest3 { @Test public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable { // not in lane - Field laneField = PolarisActiveLane.class.getDeclaredField("lane"); + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); laneField.setAccessible(true); laneField.set(polarisActiveLane, ""); @@ -133,7 +133,7 @@ public class KafkaLaneAspectTest3 { when(rule.getDefaultLabelValue()).thenReturn("test-lane"); laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId); - Field laneField = PolarisActiveLane.class.getDeclaredField("lane"); + Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag"); laneField.setAccessible(true); laneField.set(polarisActiveLane, "test-lane");