Merge pull request #1079 from BigXin0109/kafka

feat: kafka support #229 #231 #269
pull/1103/head
BigXin0109 2 years ago committed by GitHub
commit 64a7656dff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-adapter-kafka</artifactId>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.kafka;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cglib.core.Constants;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.TopicPartitionOffset;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
/**
* Kafka thread-pool adapter.
*/
@Slf4j
public class KafkaThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Override
public String mark() {
return "Kafka";
}
@Override
public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState result = new ThreadPoolAdapterState();
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(identify);
if (listenerContainer == null) {
log.warn("[{}] Kafka consuming thread pool not found.", identify);
return result;
}
result.setThreadPoolKey(identify);
if (listenerContainer instanceof ConcurrentMessageListenerContainer) {
result.setCoreSize(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer).getConcurrency());
result.setMaximumSize(result.getCoreSize());
} else {
result.setCoreSize(1);
result.setMaximumSize(1);
}
return result;
}
@Override
public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> adapterStateList = new ArrayList<>();
kafkaListenerEndpointRegistry.getListenerContainerIds().forEach(id -> adapterStateList.add(getThreadPoolState(id)));
return adapterStateList;
}
@Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(threadPoolKey);
if (listenerContainer == null) {
log.warn("[{}] Kafka consuming thread pool not found.", threadPoolKey);
return false;
}
if (!(listenerContainer instanceof ConcurrentMessageListenerContainer)) {
log.warn("[{}] Kafka consuming thread pool not support modify.", threadPoolKey);
return false;
}
ConcurrentMessageListenerContainer concurrentContainer = (ConcurrentMessageListenerContainer) listenerContainer;
int originalCoreSize = concurrentContainer.getConcurrency();
int originalMaximumPoolSize = originalCoreSize;
Integer concurrency = threadPoolAdapterParameter.getCorePoolSize();
if (originalCoreSize < concurrency) {
// add consumer
if (!addConsumer(threadPoolKey, concurrentContainer, originalCoreSize, concurrency)) {
return false;
}
} else {
// stop consumer
decreaseConsumer(threadPoolKey, concurrentContainer, originalCoreSize, concurrency);
}
concurrentContainer.setConcurrency(concurrency);
log.info("[{}] Kafka consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, concurrency),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, concurrency));
return true;
}
/**
* @param threadPoolKey
* @param concurrentContainer
* @param originalCoreSize
* @param concurrency
* @since org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStop
*/
private static void decreaseConsumer(String threadPoolKey, ConcurrentMessageListenerContainer concurrentContainer, int originalCoreSize, Integer concurrency) {
int targetDecrease = originalCoreSize - concurrency;
List<KafkaMessageListenerContainer> containers = (List) ReflectUtil.getFieldValue(concurrentContainer, "containers");
Iterator<KafkaMessageListenerContainer> iterator = containers.iterator();
int count = 0;
while (iterator.hasNext() && count < targetDecrease) {
KafkaMessageListenerContainer container = iterator.next();
if (container.isRunning()) {
container.stop(() -> {
});
count++;
}
}
log.info("[{}] Kafka consumption change. target decrease {} ,real decrease {}", threadPoolKey, targetDecrease, count);
}
/**
* @param threadPoolKey
* @param concurrentContainer
* @param originalCoreSize
* @param concurrency
* @return true success
* @since org.springframework.kafka.listener.ConcurrentMessageListenerContainer#doStart()
*/
private static boolean addConsumer(String threadPoolKey, ConcurrentMessageListenerContainer concurrentContainer, int originalCoreSize, Integer concurrency) {
ContainerProperties containerProperties = concurrentContainer.getContainerProperties();
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && concurrency > topicPartitions.length) {
log.warn("[{}] Kafka consuming thread pool not support modify. " +
"When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions;", threadPoolKey);
return false;
}
List<KafkaMessageListenerContainer> containers = (List) ReflectUtil.getFieldValue(concurrentContainer, "containers");
boolean alwaysClientIdSuffix = (Boolean) ReflectUtil.getFieldValue(concurrentContainer, "alwaysClientIdSuffix");
int size = containers.size();
for (int i = size; i < concurrency - originalCoreSize + size; i++) {
KafkaMessageListenerContainer container = ReflectUtil.invoke(concurrentContainer, "constructContainer", containerProperties, topicPartitions, i);
String beanName = concurrentContainer.getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
container.setApplicationContext(ApplicationContextHolder.getInstance());
if (concurrentContainer.getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(concurrentContainer.getApplicationEventPublisher());
}
container.setClientIdSuffix(concurrency > 1 || alwaysClientIdSuffix ? "-" + i : "");
container.setGenericErrorHandler(ReflectUtil.invoke(concurrentContainer, "getGenericErrorHandler"));
container.setAfterRollbackProcessor(ReflectUtil.invoke(concurrentContainer, "getAfterRollbackProcessor"));
Method getRecordInterceptor = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), "getRecordInterceptor", Constants.EMPTY_CLASS_ARRAY);
ReflectUtil.setAccessible(getRecordInterceptor);
container.setRecordInterceptor(ReflectUtil.invoke(concurrentContainer, getRecordInterceptor));
Method isInterceptBeforeTx = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), "isInterceptBeforeTx", Constants.EMPTY_CLASS_ARRAY);
ReflectUtil.setAccessible(isInterceptBeforeTx);
container.setInterceptBeforeTx(ReflectUtil.invoke(concurrentContainer, isInterceptBeforeTx));
container.setEmergencyStop(() -> {
concurrentContainer.stop(() -> {
});
ReflectUtil.invoke(concurrentContainer, "publishContainerStoppedEvent");
});
Method isPaused = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), "isPaused", Constants.EMPTY_CLASS_ARRAY);
ReflectUtil.setAccessible(isPaused);
if (ReflectUtil.invoke(concurrentContainer, isPaused)) {
container.pause();
}
container.start();
containers.add(container);
}
return true;
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
try {
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = ApplicationContextHolder.getBean(KafkaListenerEndpointRegistry.class);
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
} catch (Exception ex) {
log.error("Failed to get Kafka thread pool.", ex);
}
}
}

@ -17,6 +17,7 @@
<module>hippo4j-adapter-alibaba-dubbo</module>
<module>hippo4j-adapter-rabbitmq</module>
<module>hippo4j-adapter-rocketmq</module>
<module>hippo4j-adapter-kafka</module>
<module>hippo4j-adapter-hystrix</module>
<module>hippo4j-adapter-spring-cloud-stream-rocketmq</module>
<module>hippo4j-adapter-spring-cloud-stream-rabbitmq</module>

@ -20,6 +20,7 @@ package cn.hippo4j.common.toolkit;
import cn.hippo4j.common.web.exception.IllegalException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.cglib.core.ReflectUtils;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;
@ -234,6 +235,28 @@ public class ReflectUtil {
return null;
}
/**
* Invoke.
*
* @param obj the obj
* @param methodName the method Name
* @param arguments parameters
* @return result for zhe method
*/
@SuppressWarnings("unchecked")
public static <T> T invoke(Object obj, String methodName, Object... arguments) {
try {
Method method = ReflectUtil.getMethodByName(obj.getClass(), methodName);
if (method == null) {
throw new IllegalException(methodName + "method not exists");
}
ReflectUtil.setAccessible(method);
return (T) method.invoke(obj, arguments);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalException(e);
}
}
/**
* Invoke.
*
@ -283,4 +306,19 @@ public class ReflectUtil {
}
return null;
}
/**
*
* @param clazz
* @param methodName
* @param parameterTypes
* @return
*/
public static Method findDeclaredMethod(Class clazz, String methodName, Class[] parameterTypes) {
try {
return ReflectUtils.findDeclaredMethod(clazz, methodName, parameterTypes);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-spring-boot-starter-adapter-kafka-example</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-adapter-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka.example;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* Message consume.
*/
@Slf4j
@Service
public class MessageConsume {
@KafkaListener(id = "testId1", topics = MessageProduce.TOPIC, groupId = "testGroup")
public void onMessage(String message) {
log.info("Message: {}", message);
}
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka.example;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Message produce.
*/
@Slf4j
@Component
@RestController
@AllArgsConstructor
public class MessageProduce {
public static final String TOPIC = "test";
private final KafkaTemplate template;
@GetMapping("/message/send")
public String sendMessage() {
template.send(TOPIC, "testMessage");
return "success";
}
@PostConstruct
public void init() {
Thread t = new Thread(() -> {
while (true) {
String message = UUID.randomUUID().toString();
template.send(TOPIC, "autoTestMessage " + message);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
}
}

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka.example;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableDynamicThreadPool
@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.kafka.example"})
public class ServerAdapterKafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(ServerAdapterKafkaExampleApplication.class, args);
}
}

@ -0,0 +1,13 @@
server.port=8011
debug=true
spring.application.name=hippo4j-spring-boot-starter-adapter-kafka-example
spring.dynamic.thread-pool.server-addr=http://localhost:6691
spring.dynamic.thread-pool.namespace=prescription
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
spring.dynamic.thread-pool.username=admin
spring.dynamic.thread-pool.password=123456
spring.kafka.bootstrap-servers=localhost:9093,localhost:9094,localhost:9095

@ -26,6 +26,7 @@
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-rocketmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-kafka-example</module>
<module>hippo4j-config-etcd-spring-boot-starter-example</module>
<module>hippo4j-config-nacos-spring-boot-1x-starter-example</module>
<module>hippo4j-config-apollo-spring-boot-1x-starter-example</module>

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-adapter</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-spring-boot-starter-adapter-kafka</artifactId>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.kafka;
import cn.hippo4j.adapter.kafka.KafkaThreadPoolAdapter;
import cn.hippo4j.common.config.ApplicationContextHolder;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
/**
* Kafka adapter auto configuration.
*/
@Configuration
@AutoConfigureAfter(KafkaAutoConfiguration.class)
public class KafkaAdapterAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ApplicationContextHolder simpleApplicationContextHolder() {
return new ApplicationContextHolder();
}
@Bean
@SuppressWarnings("all")
@ConditionalOnBean(KafkaTemplate.class)
public KafkaThreadPoolAdapter kafkaThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) {
return new KafkaThreadPoolAdapter();
}
}

@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.springboot.starter.adapter.kafka.KafkaAdapterAutoConfiguration

@ -17,6 +17,7 @@
<module>hippo4j-spring-boot-starter-adapter-alibaba-dubbo</module>
<module>hippo4j-spring-boot-starter-adapter-rabbitmq</module>
<module>hippo4j-spring-boot-starter-adapter-rocketmq</module>
<module>hippo4j-spring-boot-starter-adapter-kafka</module>
<module>hippo4j-spring-boot-starter-adapter-hystrix</module>
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq</module>
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq</module>

Loading…
Cancel
Save