适配 lane mode

pull/1784/head
shedfreewu 3 weeks ago
parent a51c2421d0
commit 1aaa8e2323

@ -26,13 +26,18 @@ import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.config.Configuration;
import com.tencent.polaris.api.config.consumer.ConsumerConfig;
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.BaseLaneMode;
import com.tencent.polaris.plugins.router.lane.LaneRouter;
import com.tencent.polaris.plugins.router.lane.LaneRouterConfig;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.slf4j.Logger;
@ -55,8 +60,10 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
@Value("${spring.application.name:}")
private String springApplicationName;
private volatile String lane = "";
/**
* current instance lane tag, related to baseLaneMode.
*/
private volatile String instanceLaneTag = "";
private volatile boolean serviceInLane = false;
@ -64,6 +71,8 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
private Registration registration;
private BaseLaneMode baseLaneMode = BaseLaneMode.ONLY_UNTAGGED_INSTANCE;
public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient,
KafkaLaneProperties kafkaLaneProperties, Registration registration) {
this.polarisSDKContextManager = polarisSDKContextManager;
@ -77,6 +86,14 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(this));
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getConfig).map(Configuration::getConsumer).map(ConsumerConfig::getServiceRouter).ifPresent(serviceRouterConfig -> {
LaneRouterConfig laneRouterConfig = serviceRouterConfig.getPluginConfig(ServiceRouterConfig.DEFAULT_ROUTER_LANE, LaneRouterConfig.class);
if (laneRouterConfig != null) {
baseLaneMode = laneRouterConfig.getBaseLaneMode();
}
});
}
@Override
@ -94,24 +111,39 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
freshWhenInstancesChange(currentServiceInstances);
if (LOG.isDebugEnabled()) {
LOG.debug("current lane:{}, serviceInLane: {}", lane, serviceInLane);
LOG.debug("current instanceLaneTag:{}, serviceInLane: {}, baseLaneMode:{}", instanceLaneTag, serviceInLane, baseLaneMode);
}
}
private void freshWhenInstancesChange(List<Instance> currentServices) {
if (currentServices == null || currentServices.isEmpty()) {
return;
}
freshLaneStatus();
String tempInstanceLaneTag = "";
// get all active groups
for (Instance healthService : currentServices) {
if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) {
this.lane = healthService.getMetadata().get("lane");
break;
if (CollectionUtils.isNotEmpty(currentServices)) {
for (Instance healthService : currentServices) {
if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) {
tempInstanceLaneTag = healthService.getMetadata().get("lane");
break;
}
}
}
freshLaneStatus();
if (BaseLaneMode.ONLY_UNTAGGED_INSTANCE.equals(baseLaneMode)) {
instanceLaneTag = tempInstanceLaneTag;
}
else {
// if baseLaneMode is EXCLUDE_ENABLED_LANE_INSTANCE, check if the instance lane tag is in the lane
boolean laneTagExist = false;
for (LaneProto.LaneGroup group : getGroups()) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(rule.getDefaultLabelValue(), tempInstanceLaneTag)) {
laneTagExist = true;
}
}
}
instanceLaneTag = laneTagExist ? tempInstanceLaneTag : "";
}
}
/**
@ -128,11 +160,11 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
}
public boolean currentInstanceInLane() {
return StringUtils.isNotEmpty(lane) && serviceInLane;
return StringUtils.isNotEmpty(instanceLaneTag) && serviceInLane;
}
public String getLane() {
return lane;
public String getInstanceLaneTag() {
return instanceLaneTag;
}
public List<LaneProto.LaneGroup> getGroups() {
@ -165,7 +197,7 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
for (LaneProto.LaneGroup group : getGroups()) {
for (LaneProto.LaneRule rule : group.getRulesList()) {
if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule))
&& StringUtils.equals(rule.getDefaultLabelValue(), getLane())) {
&& StringUtils.equals(rule.getDefaultLabelValue(), getInstanceLaneTag())) {
return true;
}
}

@ -17,6 +17,7 @@
package com.tencent.cloud.plugin.mq.lane;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -28,6 +29,7 @@ import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.plugins.router.lane.BaseLaneMode;
import com.tencent.polaris.plugins.router.lane.LaneUtils;
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
import org.junit.jupiter.api.AfterEach;
@ -119,4 +121,60 @@ public class PolarisActiveLaneTest {
polarisActiveLane.freshLaneStatus(); // rule listener will call this method
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
}
@Test
public void testCallback_baseLaneMode1() throws Throwable {
Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode");
baseLaneModeField.setAccessible(true);
baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE);
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// not in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "lane-not-exist");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(rule.getDefaultLabelValue()).thenReturn("lane-exist");
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
}
@Test
public void testCallback_baseLaneMode2() throws Throwable {
Field baseLaneModeField = PolarisActiveLane.class.getDeclaredField("baseLaneMode");
baseLaneModeField.setAccessible(true);
baseLaneModeField.set(polarisActiveLane, BaseLaneMode.EXCLUDE_ENABLED_LANE_INSTANCE);
// not in lane
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
// in lane
Map<String, String> metadata = new HashMap<>();
metadata.put("lane", "lane-exist");
Instance instance = mock(Instance.class);
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
when(instance.getMetadata()).thenReturn(metadata);
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
.thenReturn(Collections.singletonList(group));
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
when(rule.getDefaultLabelValue()).thenReturn("lane-exist");
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
polarisActiveLane.callback(Collections.singletonList(instance));
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
}
}

@ -74,7 +74,7 @@ public class KafkaLaneAspectTest {
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "test-lane"); // in lane

@ -75,7 +75,7 @@ public class KafkaLaneAspectTest2 {
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, ""); // not in lane

@ -78,7 +78,7 @@ public class KafkaLaneAspectTest3 {
@Test
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
// not in lane
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "");
@ -133,7 +133,7 @@ public class KafkaLaneAspectTest3 {
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");
laneField.setAccessible(true);
laneField.set(polarisActiveLane, "test-lane");

Loading…
Cancel
Save