parent
2c9979b392
commit
b9401a615c
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.tencent.cloud.common.metadata.MetadataContext;
|
||||
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
|
||||
import com.tencent.polaris.api.pojo.Instance;
|
||||
import com.tencent.polaris.api.pojo.RegistryCacheValue;
|
||||
import com.tencent.polaris.api.pojo.ServiceEventKey;
|
||||
import com.tencent.polaris.api.utils.StringUtils;
|
||||
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
|
||||
|
||||
public abstract class AbstractActiveLane extends AbstractResourceEventListener {
|
||||
|
||||
public abstract boolean ifConsume(String laneId);
|
||||
|
||||
public abstract String getLaneHeaderKey();
|
||||
|
||||
public String formatLaneId(String laneId) {
|
||||
return laneId;
|
||||
}
|
||||
|
||||
public abstract void callback(List<Instance> currentServiceInstances);
|
||||
|
||||
@Override
|
||||
public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
|
||||
|
||||
if (newValue.getEventType() == ServiceEventKey.EventType.INSTANCE
|
||||
&& newValue instanceof ServiceInstancesByProto
|
||||
&& StringUtils.equals(svcEventKey.getService(), MetadataContext.LOCAL_SERVICE)) {
|
||||
|
||||
ServiceInstancesByProto newIns = (ServiceInstancesByProto) newValue;
|
||||
callback(newIns.getInstances());
|
||||
}
|
||||
}
|
||||
}
|
||||
19
spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListener.java → spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListener.java
19
spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListener.java → spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/main/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListener.java
@ -0,0 +1,182 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import com.tencent.cloud.common.metadata.MetadataContext;
|
||||
import com.tencent.cloud.common.util.JacksonUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import com.tencent.polaris.api.plugin.compose.Extensions;
|
||||
import com.tencent.polaris.api.pojo.Instance;
|
||||
import com.tencent.polaris.api.pojo.ServiceKey;
|
||||
import com.tencent.polaris.api.utils.CollectionUtils;
|
||||
import com.tencent.polaris.api.utils.StringUtils;
|
||||
import com.tencent.polaris.client.api.SDKContext;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneRouter;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneUtils;
|
||||
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cloud.client.serviceregistry.Registration;
|
||||
|
||||
|
||||
public class PolarisActiveLane extends AbstractActiveLane implements InitializingBean {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PolarisActiveLane.class);
|
||||
|
||||
private final PolarisSDKContextManager polarisSDKContextManager;
|
||||
|
||||
private final PolarisDiscoveryHandler discoveryClient;
|
||||
|
||||
private final KafkaLaneProperties kafkaLaneProperties;
|
||||
|
||||
@Value("${spring.application.name:}")
|
||||
private String springApplicationName;
|
||||
|
||||
private volatile String lane = "";
|
||||
|
||||
private volatile boolean serviceInLane = false;
|
||||
|
||||
private volatile List<LaneProto.LaneGroup> groups;
|
||||
|
||||
private Registration registration;
|
||||
|
||||
public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient,
|
||||
KafkaLaneProperties kafkaLaneProperties, Registration registration) {
|
||||
this.polarisSDKContextManager = polarisSDKContextManager;
|
||||
this.discoveryClient = discoveryClient;
|
||||
this.kafkaLaneProperties = kafkaLaneProperties;
|
||||
this.registration = registration;
|
||||
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
|
||||
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
|
||||
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus)));
|
||||
|
||||
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
|
||||
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
|
||||
.ifPresent(localRegistry -> localRegistry.registerResourceListener(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
// get instances to trigger callback when instances change
|
||||
discoveryClient.getHealthyInstances(springApplicationName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callback(List<Instance> currentServiceInstances) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("currentServices: {}", JacksonUtils.serialize2Json(currentServiceInstances));
|
||||
}
|
||||
|
||||
freshWhenInstancesChange(currentServiceInstances);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("current lane:{}, serviceInLane: {}", lane, serviceInLane);
|
||||
}
|
||||
}
|
||||
|
||||
private void freshWhenInstancesChange(List<Instance> currentServices) {
|
||||
if (currentServices == null || currentServices.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// get all active groups
|
||||
for (Instance healthService : currentServices) {
|
||||
if (StringUtils.equals(healthService.getId(), registration.getInstanceId())) {
|
||||
this.lane = healthService.getMetadata().get("lane");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
freshLaneStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* update lane status.
|
||||
*/
|
||||
public void freshLaneStatus() {
|
||||
|
||||
ServiceKey localService = new ServiceKey(com.tencent.cloud.common.metadata.MetadataContext.LOCAL_NAMESPACE,
|
||||
com.tencent.cloud.common.metadata.MetadataContext.LOCAL_SERVICE);
|
||||
|
||||
groups = LaneUtils.getLaneGroups(localService, polarisSDKContextManager.getSDKContext().getExtensions());
|
||||
|
||||
serviceInLane = CollectionUtils.isNotEmpty(groups);
|
||||
}
|
||||
|
||||
public boolean currentInstanceInLane() {
|
||||
return StringUtils.isNotEmpty(lane) && serviceInLane;
|
||||
}
|
||||
|
||||
public String getLane() {
|
||||
return lane;
|
||||
}
|
||||
|
||||
public List<LaneProto.LaneGroup> getGroups() {
|
||||
return groups == null ? Collections.emptyList() : groups;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean ifConsume(String messageLaneId) {
|
||||
// message has no lane id
|
||||
if (StringUtils.isEmpty(messageLaneId)) {
|
||||
if (!currentInstanceInLane()) {
|
||||
// baseline service, consume directly
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
// lane listener consumes baseline message
|
||||
return this.kafkaLaneProperties.getLaneConsumeMain();
|
||||
}
|
||||
}
|
||||
else {
|
||||
LaneUtils.setCallerLaneId(messageLaneId);
|
||||
|
||||
// message has lane id
|
||||
if (!currentInstanceInLane()) {
|
||||
// baseline service
|
||||
return this.kafkaLaneProperties.getMainConsumeLane();
|
||||
}
|
||||
else {
|
||||
// whether the message lane id is the same as the lane id of the listener
|
||||
for (LaneProto.LaneGroup group : getGroups()) {
|
||||
for (LaneProto.LaneRule rule : group.getRulesList()) {
|
||||
if (StringUtils.equals(messageLaneId, LaneUtils.buildStainLabel(rule))
|
||||
&& StringUtils.equals(rule.getDefaultLabelValue(), getLane())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLaneHeaderKey() {
|
||||
return MetadataContext.DEFAULT_TRANSITIVE_PREFIX + LaneRouter.TRAFFIC_STAIN_LABEL;
|
||||
}
|
||||
}
|
||||
27
spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListenerTest.java → spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListenerTest.java
27
spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/tsf/TsfLaneRuleListenerTest.java → spring-cloud-tencent-plugin-starters/spring-cloud-starter-tencent-mq-plugin/src/test/java/com/tencent/cloud/plugin/mq/lane/LaneRuleListenerTest.java
@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.tencent.cloud.common.tsf.TsfContextUtils;
|
||||
import com.tencent.cloud.common.util.JacksonUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import com.tencent.polaris.api.pojo.Instance;
|
||||
import com.tencent.polaris.client.api.SDKContext;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneUtils;
|
||||
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.cloud.client.serviceregistry.Registration;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test for {@link PolarisActiveLane}.
|
||||
* Instance and service status change.
|
||||
*/
|
||||
public class PolarisActiveLaneTest {
|
||||
|
||||
private static final String CURRENT_INSTANCE_ID = "current-instance-id";
|
||||
|
||||
private KafkaLaneProperties kafkaLaneProperties;
|
||||
private PolarisActiveLane polarisActiveLane;
|
||||
private PolarisSDKContextManager polarisSDKContextManager;
|
||||
private Registration registration;
|
||||
private LaneProto.LaneGroup group;
|
||||
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
|
||||
private MockedStatic<JacksonUtils> jacksonUtilsMockedStatic;
|
||||
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
group = mock(LaneProto.LaneGroup.class);
|
||||
kafkaLaneProperties = new KafkaLaneProperties();
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
|
||||
when(polarisSDKContextManager.getSDKContext()).thenReturn(mock(SDKContext.class));
|
||||
registration = mock(Registration.class);
|
||||
when(registration.getInstanceId()).thenReturn(CURRENT_INSTANCE_ID);
|
||||
polarisActiveLane = new PolarisActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, registration);
|
||||
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
|
||||
jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class);
|
||||
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
|
||||
|
||||
jacksonUtilsMockedStatic.when(() -> JacksonUtils.serialize2Json(any())).thenReturn("{}");
|
||||
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
laneUtilsMockedStatic.close();
|
||||
jacksonUtilsMockedStatic.close();
|
||||
tsfContextUtilsMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallback() throws Throwable {
|
||||
// not in lane
|
||||
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
|
||||
|
||||
// in lane
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
metadata.put("lane", "test-lane");
|
||||
|
||||
Instance instance = mock(Instance.class);
|
||||
when(instance.getId()).thenReturn(CURRENT_INSTANCE_ID);
|
||||
when(instance.getMetadata()).thenReturn(metadata);
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any()))
|
||||
.thenReturn(Collections.singletonList(group));
|
||||
|
||||
polarisActiveLane.callback(Collections.singletonList(instance));
|
||||
|
||||
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
|
||||
|
||||
// instance not in lane
|
||||
metadata.remove("lane");
|
||||
polarisActiveLane.callback(Collections.singletonList(instance));
|
||||
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
|
||||
|
||||
// reset, instance in lane
|
||||
metadata.put("lane", "test-lane");
|
||||
polarisActiveLane.callback(Collections.singletonList(instance));
|
||||
assertThat(polarisActiveLane.currentInstanceInLane()).isTrue();
|
||||
|
||||
// service not in lane
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.getLaneGroups(any(), any())).thenReturn(Collections.emptyList());
|
||||
polarisActiveLane.freshLaneStatus(); // rule listener will call this method
|
||||
assertThat(polarisActiveLane.currentInstanceInLane()).isFalse();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,333 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import com.tencent.cloud.common.tsf.TsfContextUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneUtils;
|
||||
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.cloud.client.serviceregistry.Registration;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test for {@link KafkaLaneAspect}. Instance and service not in lane.
|
||||
*/
|
||||
public class KafkaLaneAspectTest2 {
|
||||
|
||||
private KafkaLaneAspect kafkaLaneAspect;
|
||||
private PolarisSDKContextManager polarisSDKContextManager;
|
||||
private KafkaLaneProperties kafkaLaneProperties;
|
||||
private PolarisActiveLane polarisActiveLane;
|
||||
private LaneProto.LaneGroup group;
|
||||
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
|
||||
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
|
||||
group = mock(LaneProto.LaneGroup.class);
|
||||
kafkaLaneProperties = new KafkaLaneProperties();
|
||||
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class));
|
||||
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
|
||||
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
|
||||
|
||||
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
|
||||
laneField.setAccessible(true);
|
||||
laneField.set(polarisActiveLane, ""); // not in lane
|
||||
|
||||
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
|
||||
groupsField.setAccessible(true);
|
||||
groupsField.set(polarisActiveLane, Collections.singletonList(group));
|
||||
|
||||
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
|
||||
serviceInLaneField.setAccessible(true);
|
||||
serviceInLaneField.setBoolean(polarisActiveLane, false);
|
||||
|
||||
|
||||
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
laneUtilsMockedStatic.close();
|
||||
tsfContextUtilsMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWhenLaneDisabled() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(false);
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {mock(ProducerRecord.class)};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWhenNoLaneId() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {mock(ProducerRecord.class)};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWithProducerRecord() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
String laneId = "test-lane-id";
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
|
||||
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
|
||||
|
||||
when(pjp.getTarget()).thenReturn(kafkaTemplate);
|
||||
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
|
||||
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
|
||||
|
||||
// When
|
||||
kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
|
||||
// Then
|
||||
Iterator<Header> headers = producerRecord.headers().headers(polarisActiveLane.getLaneHeaderKey()).iterator();
|
||||
assertThat(headers.hasNext()).isTrue();
|
||||
Header laneHeader = headers.next();
|
||||
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWithMessage() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
String laneId = "test-lane-id";
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
|
||||
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
|
||||
Message message = MessageBuilder.withPayload("test-payload").build();
|
||||
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
|
||||
|
||||
when(pjp.getTarget()).thenReturn(kafkaTemplate);
|
||||
when(pjp.getArgs()).thenReturn(new Object[] {message});
|
||||
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
|
||||
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
|
||||
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
|
||||
|
||||
// When
|
||||
kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(false);
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {mock(ConsumerRecord.class)};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
|
||||
// Given
|
||||
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
|
||||
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
|
||||
when(rule.getGroupName()).thenReturn("test-group");
|
||||
when(rule.getName()).thenReturn("test-lane-name");
|
||||
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
String laneId = "test-group/test-lane-name"; // valid lane id
|
||||
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
|
||||
|
||||
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
|
||||
|
||||
// change main consume lane
|
||||
kafkaLaneProperties.setMainConsumeLane(true);
|
||||
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
assertThat(result).isEqualTo("result");
|
||||
|
||||
//reset
|
||||
kafkaLaneProperties.setMainConsumeLane(false);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConsumerRecordLaneIdFromHeader() {
|
||||
// Given
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
String expectedLaneId = "test-lane-id";
|
||||
consumerRecord.headers()
|
||||
.add(polarisActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// When
|
||||
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
|
||||
|
||||
// Then
|
||||
assertThat(laneId).isEqualTo(expectedLaneId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConsumerRecordLaneIdFromCallerLane() {
|
||||
// Given
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
String expectedLaneId = "lane-test";
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
|
||||
|
||||
// When
|
||||
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
|
||||
|
||||
// Then
|
||||
assertThat(laneId).isEqualTo(expectedLaneId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIfConsumeWithNoLaneId() {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneConsumeMain(true);
|
||||
// When
|
||||
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
|
||||
// Then
|
||||
assertThat(shouldConsume).isTrue(); // as baseline
|
||||
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneConsumeMain(false);
|
||||
// When
|
||||
shouldConsume = kafkaLaneAspect.ifConsume("");
|
||||
// Then
|
||||
assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithBatchMessages() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
|
||||
List<ConsumerRecord> messageList = new ArrayList<>();
|
||||
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
|
||||
record1.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
|
||||
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
|
||||
record2.headers().add(polarisActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
messageList.add(record1);
|
||||
messageList.add(record2);
|
||||
|
||||
Acknowledgment acknowledgment = mock(Acknowledgment.class);
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {messageList, acknowledgment};
|
||||
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(any())).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithEmptyBatch() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
|
||||
List<ConsumerRecord> emptyList = new ArrayList<>();
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {emptyList};
|
||||
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,172 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
|
||||
import com.tencent.cloud.common.tsf.TsfContextUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.PolarisActiveLane;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneUtils;
|
||||
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.cloud.client.serviceregistry.Registration;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test for {@link KafkaLaneAspect}.
|
||||
* Instance and service status change.
|
||||
*/
|
||||
public class KafkaLaneAspectTest3 {
|
||||
|
||||
private KafkaLaneAspect kafkaLaneAspect;
|
||||
private PolarisSDKContextManager polarisSDKContextManager;
|
||||
private KafkaLaneProperties kafkaLaneProperties;
|
||||
private PolarisActiveLane polarisActiveLane;
|
||||
private LaneProto.LaneGroup group;
|
||||
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
|
||||
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
|
||||
group = mock(LaneProto.LaneGroup.class);
|
||||
kafkaLaneProperties = new KafkaLaneProperties();
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class));
|
||||
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
|
||||
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
|
||||
|
||||
|
||||
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, polarisActiveLane);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
laneUtilsMockedStatic.close();
|
||||
tsfContextUtilsMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
|
||||
// not in lane
|
||||
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
|
||||
laneField.setAccessible(true);
|
||||
laneField.set(polarisActiveLane, "");
|
||||
|
||||
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
|
||||
groupsField.setAccessible(true);
|
||||
groupsField.set(polarisActiveLane, Collections.singletonList(group));
|
||||
|
||||
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
|
||||
serviceInLaneField.setAccessible(true);
|
||||
serviceInLaneField.setBoolean(polarisActiveLane, false);
|
||||
|
||||
String laneId = "test-group/test-lane-name"; // valid lane id
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// act
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// verify
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume
|
||||
|
||||
// in lane
|
||||
|
||||
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
|
||||
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
|
||||
when(rule.getGroupName()).thenReturn("test-group");
|
||||
when(rule.getName()).thenReturn("test-lane-name");
|
||||
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
|
||||
|
||||
laneField.set(polarisActiveLane, "test-lane");
|
||||
serviceInLaneField.setBoolean(polarisActiveLane, true);
|
||||
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
|
||||
|
||||
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
assertThat(result).isEqualTo("result");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable {
|
||||
// in lane
|
||||
String laneId = "test-group/test-lane-name"; // valid lane id
|
||||
LaneProto.LaneRule rule = mock(LaneProto.LaneRule.class);
|
||||
when(group.getRulesList()).thenReturn(Collections.singletonList(rule));
|
||||
when(rule.getGroupName()).thenReturn("test-group");
|
||||
when(rule.getName()).thenReturn("test-lane-name");
|
||||
when(rule.getDefaultLabelValue()).thenReturn("test-lane");
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.buildStainLabel(rule)).thenReturn(laneId);
|
||||
|
||||
Field laneField = PolarisActiveLane.class.getDeclaredField("lane");
|
||||
laneField.setAccessible(true);
|
||||
laneField.set(polarisActiveLane, "test-lane");
|
||||
|
||||
Field groupsField = PolarisActiveLane.class.getDeclaredField("groups");
|
||||
groupsField.setAccessible(true);
|
||||
groupsField.set(polarisActiveLane, Collections.singletonList(group));
|
||||
|
||||
Field serviceInLaneField = PolarisActiveLane.class.getDeclaredField("serviceInLane");
|
||||
serviceInLaneField.setAccessible(true);
|
||||
serviceInLaneField.setBoolean(polarisActiveLane, true);
|
||||
|
||||
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(polarisActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// act
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// verify
|
||||
assertThat(result).isEqualTo("result");
|
||||
|
||||
|
||||
// not in lane
|
||||
when(group.getRulesList()).thenReturn(Collections.emptyList());
|
||||
laneField.set(polarisActiveLane, "");
|
||||
serviceInLaneField.setBoolean(polarisActiveLane, false);
|
||||
|
||||
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,142 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
import com.tencent.cloud.common.tsf.TsfContextUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.cloud.client.serviceregistry.Registration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Test for {@link KafkaLaneAspectConfiguration}.
|
||||
*/
|
||||
public class TsfKafkaLaneAspectConfigurationTest {
|
||||
|
||||
private ApplicationContextRunner contextRunner;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
// Reset onlyTsfConsulEnabled
|
||||
Field onlyTsfConsulEnabledField = TsfContextUtils.class.getDeclaredField("onlyTsfConsulEnabled");
|
||||
onlyTsfConsulEnabledField.setAccessible(true);
|
||||
onlyTsfConsulEnabledField.setBoolean(null, true);
|
||||
|
||||
contextRunner = new ApplicationContextRunner()
|
||||
.withPropertyValues(
|
||||
"tsf_consul_enable=true",
|
||||
"tsf_consul_ip=127.0.0.1",
|
||||
"spring.cloud.polaris.address=" // Empty to simulate only TSF Consul
|
||||
)
|
||||
.withConfiguration(AutoConfigurations.of(KafkaLaneAspectConfiguration.class))
|
||||
.withUserConfiguration(MockConfiguration.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaLaneAspectBeanCreationWhenKafkaTemplatePresent() {
|
||||
// Simulate KafkaTemplate class presence
|
||||
contextRunner
|
||||
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
|
||||
.run(context -> {
|
||||
// KafkaLaneAspect should be created when KafkaTemplate is present
|
||||
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
|
||||
|
||||
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
|
||||
assertThat(aspect).isNotNull();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaLanePropertiesEnabled() {
|
||||
contextRunner
|
||||
.withPropertyValues(
|
||||
"spring.cloud.polaris.lane.kafka.lane-on=true",
|
||||
"spring.cloud.polaris.lane.kafka.lane-consume-main=true",
|
||||
"spring.cloud.polaris.lane.kafka.main-consume-lane=true"
|
||||
)
|
||||
.run(context -> {
|
||||
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
|
||||
assertThat(properties.getLaneOn()).isTrue();
|
||||
assertThat(properties.getLaneConsumeMain()).isTrue();
|
||||
assertThat(properties.getMainConsumeLane()).isTrue();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeanDependenciesInjection() {
|
||||
contextRunner
|
||||
.withPropertyValues("spring.cloud.polaris.lane.kafka.lane-on=true")
|
||||
.run(context -> {
|
||||
// Verify that all required dependencies are properly injected
|
||||
assertThat(context).hasSingleBean(KafkaLaneAspect.class);
|
||||
|
||||
KafkaLaneAspect aspect = context.getBean(KafkaLaneAspect.class);
|
||||
|
||||
// The aspect should have all required dependencies
|
||||
assertThat(aspect).isNotNull();
|
||||
|
||||
// Verify that KafkaLaneProperties is properly configured
|
||||
KafkaLaneProperties properties = context.getBean(KafkaLaneProperties.class);
|
||||
assertThat(properties).isNotNull();
|
||||
assertThat(properties.getLaneOn()).isTrue();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTsfActiveLaneBeanCreationWhenOnlyTsfConsulEnabled() {
|
||||
// Simulate Only TSF Consul enabled condition
|
||||
contextRunner
|
||||
.run(context -> {
|
||||
// TsfActiveLane should be created when only TSF Consul is enabled
|
||||
assertThat(context).hasSingleBean(TsfActiveLane.class);
|
||||
|
||||
TsfActiveLane tsfActiveLane = context.getBean(TsfActiveLane.class);
|
||||
assertThat(tsfActiveLane).isNotNull();
|
||||
});
|
||||
}
|
||||
|
||||
@Configuration
|
||||
static class MockConfiguration {
|
||||
@Bean
|
||||
public PolarisSDKContextManager polarisSDKContextManager() {
|
||||
return mock(PolarisSDKContextManager.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PolarisDiscoveryHandler polarisDiscoveryHandler() {
|
||||
return mock(PolarisDiscoveryHandler.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Registration registration() {
|
||||
return mock(Registration.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,347 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import com.tencent.cloud.common.tsf.TsfContextUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test for {@link KafkaLaneAspect}, with only tsf consul enabled. Instance not in lane.
|
||||
*/
|
||||
public class TsfKafkaLaneAspectTest2 {
|
||||
|
||||
private KafkaLaneAspect kafkaLaneAspect;
|
||||
private PolarisSDKContextManager polarisSDKContextManager;
|
||||
private KafkaLaneProperties kafkaLaneProperties;
|
||||
private TsfActiveLane tsfActiveLane;
|
||||
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
|
||||
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
|
||||
kafkaLaneProperties = new KafkaLaneProperties();
|
||||
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), kafkaLaneProperties);
|
||||
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
|
||||
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
|
||||
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);
|
||||
|
||||
// reset currentGroupLaneIds
|
||||
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
|
||||
currentGroupLaneIdsField.setAccessible(true);
|
||||
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>());
|
||||
|
||||
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
laneUtilsMockedStatic.close();
|
||||
tsfContextUtilsMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWhenLaneDisabled() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(false);
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {mock(ProducerRecord.class)};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWhenNoLaneId() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(null);
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {mock(ProducerRecord.class)};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWithProducerRecord() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
String laneId = "test-lane-id";
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
|
||||
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
|
||||
|
||||
when(pjp.getTarget()).thenReturn(kafkaTemplate);
|
||||
when(pjp.getArgs()).thenReturn(new Object[] {producerRecord});
|
||||
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
|
||||
|
||||
// When
|
||||
kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
|
||||
// Then
|
||||
Iterator<Header> headers = producerRecord.headers().headers(tsfActiveLane.getLaneHeaderKey()).iterator();
|
||||
assertThat(headers.hasNext()).isTrue();
|
||||
Header laneHeader = headers.next();
|
||||
assertThat(new String(laneHeader.value(), StandardCharsets.UTF_8)).isEqualTo(laneId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerAspectWithMessage() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
String laneId = "test-lane-id";
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.fetchLaneByCaller(any(), any(), any())).thenReturn(laneId);
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);
|
||||
RecordMessageConverter recordMessageConverter = mock(RecordMessageConverter.class);
|
||||
Message message = MessageBuilder.withPayload("test-payload").build();
|
||||
ProducerRecord producerRecord = new ProducerRecord<>("test-topic", "test-value");
|
||||
|
||||
when(pjp.getTarget()).thenReturn(kafkaTemplate);
|
||||
when(pjp.getArgs()).thenReturn(new Object[] {message});
|
||||
when(kafkaTemplate.getDefaultTopic()).thenReturn("test-topic");
|
||||
when(kafkaTemplate.send(producerRecord)).thenReturn(null);
|
||||
when(kafkaTemplate.getMessageConverter()).thenReturn(recordMessageConverter);
|
||||
|
||||
// When
|
||||
kafkaLaneAspect.aroundProducerMessage(pjp);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWhenLaneDisabled() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(false);
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {mock(ConsumerRecord.class)};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_LaneIdExist() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
|
||||
laneActiveMapField.setAccessible(true);
|
||||
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
|
||||
laneActiveMap.put("lane-test", true);
|
||||
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
|
||||
String laneId = "tsf/lane-test"; // valid lane id
|
||||
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_LaneIdExist2() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
|
||||
laneActiveMapField.setAccessible(true);
|
||||
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
|
||||
laneActiveMap.put("lane-test", false);
|
||||
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
|
||||
String laneId = "tsf/lane-test"; // valid lane id
|
||||
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
|
||||
|
||||
kafkaLaneProperties.setMainConsumeLane(true);
|
||||
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
assertThat(result).isEqualTo("result"); // as mainConsumeLane is true
|
||||
|
||||
// reset mainConsumeLane
|
||||
kafkaLaneProperties.setMainConsumeLane(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConsumerRecordLaneIdFromHeader() {
|
||||
// Given
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
String expectedLaneId = "test-lane-id";
|
||||
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), expectedLaneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
// When
|
||||
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
|
||||
|
||||
// Then
|
||||
assertThat(laneId).isEqualTo(expectedLaneId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetConsumerRecordLaneIdFromCallerLane() {
|
||||
// Given
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
String expectedLaneId = "lane-test";
|
||||
laneUtilsMockedStatic.when(() -> LaneUtils.getCallerLaneId()).thenReturn(expectedLaneId);
|
||||
|
||||
// When
|
||||
String laneId = kafkaLaneAspect.getConsumerRecordLaneId(consumerRecord);
|
||||
|
||||
// Then
|
||||
assertThat(laneId).isEqualTo("tsf/" + expectedLaneId);
|
||||
}
|
||||
|
||||
/**
|
||||
* instance not in lane, act as baseline service.
|
||||
*/
|
||||
@Test
|
||||
public void testIfConsumeWithNoLaneId() throws Exception {
|
||||
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneConsumeMain(true);
|
||||
// When
|
||||
boolean shouldConsume = kafkaLaneAspect.ifConsume("");
|
||||
// Then
|
||||
assertThat(shouldConsume).isTrue(); // baseline service
|
||||
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneConsumeMain(false);
|
||||
// When
|
||||
shouldConsume = kafkaLaneAspect.ifConsume("");
|
||||
// Then
|
||||
assertThat(shouldConsume).isTrue(); // unrelated to LaneConsumeMain
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithBatchMessages() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
|
||||
List<ConsumerRecord> messageList = new ArrayList<>();
|
||||
ConsumerRecord record1 = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value1");
|
||||
record1.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane1".getBytes(StandardCharsets.UTF_8));
|
||||
ConsumerRecord record2 = new ConsumerRecord<>("test-topic", 0, 1L, "key", "value2");
|
||||
record2.headers().add(tsfActiveLane.getLaneHeaderKey(), "lane2".getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
messageList.add(record1);
|
||||
messageList.add(record2);
|
||||
|
||||
Acknowledgment acknowledgment = mock(Acknowledgment.class);
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {messageList, acknowledgment};
|
||||
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(any())).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithEmptyBatch() throws Throwable {
|
||||
// Given
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
|
||||
List<ConsumerRecord> emptyList = new ArrayList<>();
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {emptyList};
|
||||
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// When
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// Then
|
||||
assertThat(result).isEqualTo("result");
|
||||
verify(pjp).proceed(args);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,149 @@
|
||||
/*
|
||||
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
|
||||
*
|
||||
* Copyright (C) 2021 Tencent. All rights reserved.
|
||||
*
|
||||
* Licensed under the BSD 3-Clause License (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://opensource.org/licenses/BSD-3-Clause
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed
|
||||
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package com.tencent.cloud.plugin.mq.lane.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
||||
import com.tencent.cloud.common.tsf.TsfContextUtils;
|
||||
import com.tencent.cloud.plugin.mq.lane.tsf.TsfActiveLane;
|
||||
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
|
||||
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
|
||||
import com.tencent.polaris.plugins.router.lane.LaneUtils;
|
||||
import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Test for {@link KafkaLaneAspect}.
|
||||
* Instance and service status change.
|
||||
*/
|
||||
public class TsfKafkaLaneAspectTest3 {
|
||||
|
||||
private KafkaLaneAspect kafkaLaneAspect;
|
||||
private PolarisSDKContextManager polarisSDKContextManager;
|
||||
private KafkaLaneProperties kafkaLaneProperties;
|
||||
private TsfActiveLane tsfActiveLane;
|
||||
private LaneProto.LaneGroup group;
|
||||
private MockedStatic<LaneUtils> laneUtilsMockedStatic;
|
||||
private MockedStatic<TsfContextUtils> tsfContextUtilsMockedStatic;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
|
||||
group = mock(LaneProto.LaneGroup.class);
|
||||
kafkaLaneProperties = new KafkaLaneProperties();
|
||||
kafkaLaneProperties.setLaneOn(true);
|
||||
tsfActiveLane = new TsfActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties);
|
||||
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
|
||||
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
|
||||
|
||||
|
||||
kafkaLaneAspect = new KafkaLaneAspect(polarisSDKContextManager, kafkaLaneProperties, tsfActiveLane);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
laneUtilsMockedStatic.close();
|
||||
tsfContextUtilsMockedStatic.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_toInLane() throws Throwable {
|
||||
// not in lane
|
||||
|
||||
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
|
||||
laneActiveMapField.setAccessible(true);
|
||||
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
|
||||
laneActiveMap.put("lane-name", true);
|
||||
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
|
||||
|
||||
String laneId = "tsf/lane-name"; // valid lane id
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// act
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// verify
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT); // not in lane, not consume
|
||||
|
||||
// in lane
|
||||
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
|
||||
currentGroupLaneIdsField.setAccessible(true);
|
||||
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name")));
|
||||
|
||||
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
assertThat(result).isEqualTo("result");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumerAspectWithLaneHeader_toNotInLane() throws Throwable {
|
||||
// in lane
|
||||
String laneId = "tsf/lane-name"; // valid lane id
|
||||
|
||||
Field currentGroupLaneIdsField = TsfActiveLane.class.getDeclaredField("currentGroupLaneIds");
|
||||
currentGroupLaneIdsField.setAccessible(true);
|
||||
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>(Collections.singletonList("lane-name")));
|
||||
|
||||
Field laneActiveMapField = TsfActiveLane.class.getDeclaredField("laneActiveMap");
|
||||
laneActiveMapField.setAccessible(true);
|
||||
HashMap<String, Boolean> laneActiveMap = new HashMap<>();
|
||||
laneActiveMap.put("lane-name", true);
|
||||
laneActiveMapField.set(tsfActiveLane, laneActiveMap);
|
||||
|
||||
|
||||
ConsumerRecord consumerRecord = new ConsumerRecord<>("test-topic", 0, 0L, "key", "value");
|
||||
consumerRecord.headers().add(tsfActiveLane.getLaneHeaderKey(), laneId.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
ProceedingJoinPoint pjp = mock(ProceedingJoinPoint.class);
|
||||
Object[] args = new Object[] {consumerRecord};
|
||||
when(pjp.getArgs()).thenReturn(args);
|
||||
when(pjp.proceed(args)).thenReturn("result");
|
||||
|
||||
// act
|
||||
Object result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
|
||||
// verify
|
||||
assertThat(result).isEqualTo("result");
|
||||
|
||||
|
||||
// not in lane
|
||||
currentGroupLaneIdsField.set(tsfActiveLane, new HashSet<>());
|
||||
|
||||
result = kafkaLaneAspect.aroundConsumerMessage(pjp);
|
||||
assertThat(result).isEqualTo(KafkaLaneAspect.EMPTY_OBJECT);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in new issue