pull/1784/head
shedfreewu 2 weeks ago
parent 4887f544bf
commit 1b5e678bb7

@ -29,7 +29,7 @@ import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
public abstract class AbstractActiveLane extends AbstractResourceEventListener {
public abstract boolean ifConsume(String laneId);
public abstract boolean ifConsume(String laneId, MqLaneProperties mqLaneProperties);
public abstract String getLaneHeaderKey();

@ -0,0 +1,29 @@
/*
* Tencent is pleased to support the open source community by making spring-cloud-tencent available.
*
* Copyright (C) 2021 Tencent. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.tencent.cloud.plugin.mq.lane;
public interface MqLaneProperties {
Boolean getLaneOn();
Boolean getLaneConsumeMain();
Boolean getMainConsumeLane();
Boolean getAutoSetHeader();
}

@ -23,7 +23,6 @@ import java.util.Optional;
import com.tencent.cloud.common.metadata.MetadataContext;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.config.Configuration;
@ -56,8 +55,6 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
private final PolarisDiscoveryHandler discoveryClient;
private final KafkaLaneProperties kafkaLaneProperties;
@Value("${spring.application.name:}")
private String springApplicationName;
/**
@ -73,11 +70,10 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
private BaseLaneMode baseLaneMode = BaseLaneMode.ONLY_UNTAGGED_INSTANCE;
public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient,
KafkaLaneProperties kafkaLaneProperties, Registration registration) {
public PolarisActiveLane(PolarisSDKContextManager polarisSDKContextManager,
PolarisDiscoveryHandler discoveryClient, Registration registration) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.discoveryClient = discoveryClient;
this.kafkaLaneProperties = kafkaLaneProperties;
this.registration = registration;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
@ -172,7 +168,7 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
}
@Override
public boolean ifConsume(String messageLaneId) {
public boolean ifConsume(String messageLaneId, MqLaneProperties mqLaneProperties) {
// message has no lane id
if (StringUtils.isEmpty(messageLaneId)) {
if (!currentInstanceInLane()) {
@ -181,7 +177,7 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
}
else {
// lane listener consumes baseline message
return this.kafkaLaneProperties.getLaneConsumeMain();
return mqLaneProperties.getLaneConsumeMain();
}
}
else {
@ -190,7 +186,7 @@ public class PolarisActiveLane extends AbstractActiveLane implements Initializin
// message has lane id
if (!currentInstanceInLane()) {
// baseline service
return this.kafkaLaneProperties.getMainConsumeLane();
return mqLaneProperties.getMainConsumeLane();
}
else {
// whether the message lane id is the same as the lane id of the listener

@ -262,6 +262,6 @@ public class KafkaLaneAspect {
* @return whether to consume.
*/
boolean ifConsume(String messageLaneId) {
return activeLane.ifConsume(messageLaneId);
return activeLane.ifConsume(messageLaneId, kafkaLaneProperties);
}
}

@ -46,13 +46,13 @@ public class KafkaLaneAspectConfiguration {
@Bean
@ConditionalOnClass(name = {"org.springframework.kafka.core.KafkaTemplate"})
@ConditionalOnMissingBean
public AbstractActiveLane activeLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient,
KafkaLaneProperties kafkaLaneProperties, Registration registration) {
public AbstractActiveLane activeLane(PolarisSDKContextManager polarisSDKContextManager,
PolarisDiscoveryHandler discoveryClient, Registration registration) {
if (TsfContextUtils.isOnlyTsfConsulEnabled()) {
return new TsfActiveLane(polarisSDKContextManager, discoveryClient, kafkaLaneProperties);
return new TsfActiveLane(polarisSDKContextManager, discoveryClient);
}
else {
return new PolarisActiveLane(polarisSDKContextManager, discoveryClient, kafkaLaneProperties, registration);
return new PolarisActiveLane(polarisSDKContextManager, discoveryClient, registration);
}
}
}

@ -17,10 +17,12 @@
package com.tencent.cloud.plugin.mq.lane.kafka;
import com.tencent.cloud.plugin.mq.lane.MqLaneProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.polaris.lane.kafka")
public class KafkaLaneProperties {
public class KafkaLaneProperties implements MqLaneProperties {
/**
* enable kafka lane.

@ -27,7 +27,7 @@ import java.util.Set;
import com.tencent.cloud.common.util.JacksonUtils;
import com.tencent.cloud.plugin.mq.lane.AbstractActiveLane;
import com.tencent.cloud.plugin.mq.lane.LaneRuleListener;
import com.tencent.cloud.plugin.mq.lane.kafka.KafkaLaneProperties;
import com.tencent.cloud.plugin.mq.lane.MqLaneProperties;
import com.tencent.cloud.polaris.context.PolarisSDKContextManager;
import com.tencent.cloud.polaris.discovery.PolarisDiscoveryHandler;
import com.tencent.polaris.api.plugin.compose.Extensions;
@ -55,8 +55,6 @@ public class TsfActiveLane extends AbstractActiveLane implements InitializingBea
private final PolarisDiscoveryHandler discoveryClient;
private final KafkaLaneProperties kafkaLaneProperties;
/**
* Online deployment groups for this service (same namespace id and application id required).
*/
@ -81,10 +79,9 @@ public class TsfActiveLane extends AbstractActiveLane implements InitializingBea
@Value("${spring.application.name:}")
private String springApplicationName;
public TsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient, KafkaLaneProperties kafkaLaneProperties) {
public TsfActiveLane(PolarisSDKContextManager polarisSDKContextManager, PolarisDiscoveryHandler discoveryClient) {
this.polarisSDKContextManager = polarisSDKContextManager;
this.discoveryClient = discoveryClient;
this.kafkaLaneProperties = kafkaLaneProperties;
Optional.ofNullable(polarisSDKContextManager).map(PolarisSDKContextManager::getSDKContext)
.map(SDKContext::getExtensions).map(Extensions::getLocalRegistry)
.ifPresent(localRegistry -> localRegistry.registerResourceListener(new LaneRuleListener(this::freshLaneStatus)));
@ -126,7 +123,7 @@ public class TsfActiveLane extends AbstractActiveLane implements InitializingBea
String nsId = healthService.getMetadata().get("TSF_NAMESPACE_ID");
String groupId = healthService.getMetadata().get("TSF_GROUP_ID");
String applicationId = healthService.getMetadata().get("TSF_APPLICATION_ID");
if (tsfNamespaceId.equals(nsId) && tsfApplicationId.equals(applicationId) && StringUtils.isNotEmpty(groupId)) {
if (StringUtils.equals(tsfNamespaceId, nsId) && StringUtils.equals(tsfApplicationId, applicationId) && StringUtils.isNotEmpty(groupId)) {
currentActiveGroupSet.add(groupId);
}
}
@ -194,7 +191,7 @@ public class TsfActiveLane extends AbstractActiveLane implements InitializingBea
}
@Override
public boolean ifConsume(String originMessageLaneId) {
public boolean ifConsume(String originMessageLaneId, MqLaneProperties mqLaneProperties) {
String laneId = originMessageLaneId;
if (laneId != null && laneId.contains("/")) {
laneId = laneId.split("/")[1];
@ -209,7 +206,7 @@ public class TsfActiveLane extends AbstractActiveLane implements InitializingBea
}
else {
// lane listener consumes baseline message
return this.kafkaLaneProperties.getLaneConsumeMain();
return mqLaneProperties.getLaneConsumeMain();
}
}
else {
@ -223,7 +220,7 @@ public class TsfActiveLane extends AbstractActiveLane implements InitializingBea
// message carries lane id, but the current service's lane has deployment groups but is not active (or manually taken offline), consume baseline based on switch configuration, default is not to consume
consume = consume ||
(this.kafkaLaneProperties.getMainConsumeLane() &&
(mqLaneProperties.getMainConsumeLane() &&
isLaneExist(laneId) &&
!isActiveLane(laneId)
);

@ -71,7 +71,7 @@ public class PolarisActiveLaneTest {
when(polarisSDKContextManager.getSDKContext()).thenReturn(mock(SDKContext.class));
registration = mock(Registration.class);
when(registration.getInstanceId()).thenReturn(CURRENT_INSTANCE_ID);
polarisActiveLane = new PolarisActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, registration);
polarisActiveLane = new PolarisActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), registration);
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
jacksonUtilsMockedStatic = Mockito.mockStatic(JacksonUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);

@ -71,7 +71,7 @@ public class KafkaLaneAspectTest {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class));
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
Field laneField = PolarisActiveLane.class.getDeclaredField("instanceLaneTag");

@ -71,7 +71,7 @@ public class KafkaLaneAspectTest2 {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class));
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);

@ -61,7 +61,7 @@ public class KafkaLaneAspectTest3 {
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties, mock(Registration.class));
polarisActiveLane = new PolarisActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), mock(Registration.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);

@ -69,7 +69,7 @@ public class TsfKafkaLaneAspectTest {
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), kafkaLaneProperties);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);

@ -68,7 +68,7 @@ public class TsfKafkaLaneAspectTest2 {
public void setUp() throws Exception {
polarisSDKContextManager = mock(PolarisSDKContextManager.class);
kafkaLaneProperties = new KafkaLaneProperties();
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class), kafkaLaneProperties);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);
tsfContextUtilsMockedStatic.when(TsfContextUtils::isOnlyTsfConsulEnabled).thenReturn(true);

@ -61,7 +61,7 @@ public class TsfKafkaLaneAspectTest3 {
group = mock(LaneProto.LaneGroup.class);
kafkaLaneProperties = new KafkaLaneProperties();
kafkaLaneProperties.setLaneOn(true);
tsfActiveLane = new TsfActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class), kafkaLaneProperties);
tsfActiveLane = new TsfActiveLane(mock(PolarisSDKContextManager.class), mock(PolarisDiscoveryHandler.class));
laneUtilsMockedStatic = Mockito.mockStatic(LaneUtils.class);
tsfContextUtilsMockedStatic = Mockito.mockStatic(TsfContextUtils.class);

@ -81,7 +81,7 @@ public class TsfActiveLaneTest {
when(polarisSDKContextManager.getSDKContext()).thenReturn(sdkContext);
when(sdkContext.getExtensions()).thenReturn(extensions);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient, kafkaLaneProperties);
tsfActiveLane = new TsfActiveLane(polarisSDKContextManager, discoveryClient);
// Set up field values using reflection

Loading…
Cancel
Save