diff --git a/hippo4j-adapter/hippo4j-adapter-kafka/pom.xml b/hippo4j-adapter/hippo4j-adapter-kafka/pom.xml new file mode 100644 index 00000000..b963ef9f --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-kafka/pom.xml @@ -0,0 +1,24 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-adapter + ${revision} + + hippo4j-adapter-kafka + + + + cn.hippo4j + hippo4j-adapter-base + ${project.version} + + + org.springframework.kafka + spring-kafka + true + + + diff --git a/hippo4j-adapter/hippo4j-adapter-kafka/src/main/java/cn/hippo4j/adapter/kafka/KafkaThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-kafka/src/main/java/cn/hippo4j/adapter/kafka/KafkaThreadPoolAdapter.java new file mode 100644 index 00000000..21e417ac --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-kafka/src/main/java/cn/hippo4j/adapter/kafka/KafkaThreadPoolAdapter.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.kafka; + +import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.toolkit.ReflectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.cglib.core.Constants; +import org.springframework.context.ApplicationListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.TopicPartitionOffset; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + +/** + * Kafka thread-pool adapter. + */ +@Slf4j +public class KafkaThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { + + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Override + public String mark() { + return "Kafka"; + } + + @Override + public ThreadPoolAdapterState getThreadPoolState(String identify) { + ThreadPoolAdapterState result = new ThreadPoolAdapterState(); + MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(identify); + + if (listenerContainer == null) { + log.warn("[{}] Kafka consuming thread pool not found.", identify); + return result; + } + + result.setThreadPoolKey(identify); + if (listenerContainer instanceof ConcurrentMessageListenerContainer) { + result.setCoreSize(((ConcurrentMessageListenerContainer) listenerContainer).getConcurrency()); + result.setMaximumSize(result.getCoreSize()); + } else { + result.setCoreSize(1); + result.setMaximumSize(1); + } + return result; + } + + @Override + public List getThreadPoolStates() { + List adapterStateList = new ArrayList<>(); + kafkaListenerEndpointRegistry.getListenerContainerIds().forEach(id -> adapterStateList.add(getThreadPoolState(id))); + return adapterStateList; + } + + @Override + public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(threadPoolKey); + + if (listenerContainer == null) { + log.warn("[{}] Kafka consuming thread pool not found.", threadPoolKey); + return false; + } + if (!(listenerContainer instanceof ConcurrentMessageListenerContainer)) { + log.warn("[{}] Kafka consuming thread pool not support modify.", threadPoolKey); + return false; + } + ConcurrentMessageListenerContainer concurrentContainer = (ConcurrentMessageListenerContainer) listenerContainer; + int originalCoreSize = concurrentContainer.getConcurrency(); + int originalMaximumPoolSize = originalCoreSize; + Integer concurrency = threadPoolAdapterParameter.getCorePoolSize(); + if (originalCoreSize < concurrency) { + // add consumer + if (!addConsumer(threadPoolKey, concurrentContainer, originalCoreSize, concurrency)) { + return false; + } + } else { + // stop consumer + decreaseConsumer(threadPoolKey, concurrentContainer, originalCoreSize, concurrency); + } + concurrentContainer.setConcurrency(concurrency); + log.info("[{}] Kafka consumption thread pool parameter change. coreSize: {}, maximumSize: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, concurrency), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, concurrency)); + return true; + + } + + /** + * @param threadPoolKey + * @param concurrentContainer + * @param originalCoreSize + * @param concurrency + * @since org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStop + */ + private static void decreaseConsumer(String threadPoolKey, ConcurrentMessageListenerContainer concurrentContainer, int originalCoreSize, Integer concurrency) { + int targetDecrease = originalCoreSize - concurrency; + List containers = (List) ReflectUtil.getFieldValue(concurrentContainer, "containers"); + Iterator iterator = containers.iterator(); + int count = 0; + while (iterator.hasNext() && count < targetDecrease) { + KafkaMessageListenerContainer container = iterator.next(); + if (container.isRunning()) { + container.stop(() -> { + }); + count++; + } + } + log.info("[{}] Kafka consumption change. target decrease {} ,real decrease {}", threadPoolKey, targetDecrease, count); + } + + /** + * @param threadPoolKey + * @param concurrentContainer + * @param originalCoreSize + * @param concurrency + * @return true success + * @since org.springframework.kafka.listener.ConcurrentMessageListenerContainer#doStart() + */ + private static boolean addConsumer(String threadPoolKey, ConcurrentMessageListenerContainer concurrentContainer, int originalCoreSize, Integer concurrency) { + ContainerProperties containerProperties = concurrentContainer.getContainerProperties(); + TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + if (topicPartitions != null && concurrency > topicPartitions.length) { + log.warn("[{}] Kafka consuming thread pool not support modify. " + + "When specific partitions are provided, the concurrency must be less than or " + + "equal to the number of partitions;", threadPoolKey); + return false; + } + List containers = (List) ReflectUtil.getFieldValue(concurrentContainer, "containers"); + boolean alwaysClientIdSuffix = (Boolean) ReflectUtil.getFieldValue(concurrentContainer, "alwaysClientIdSuffix"); + int size = containers.size(); + for (int i = size; i < concurrency - originalCoreSize + size; i++) { + KafkaMessageListenerContainer container = ReflectUtil.invoke(concurrentContainer, "constructContainer", containerProperties, topicPartitions, i); + String beanName = concurrentContainer.getBeanName(); + container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i); + container.setApplicationContext(ApplicationContextHolder.getInstance()); + if (concurrentContainer.getApplicationEventPublisher() != null) { + container.setApplicationEventPublisher(concurrentContainer.getApplicationEventPublisher()); + } + container.setClientIdSuffix(concurrency > 1 || alwaysClientIdSuffix ? "-" + i : ""); + container.setGenericErrorHandler(ReflectUtil.invoke(concurrentContainer, "getGenericErrorHandler")); + container.setAfterRollbackProcessor(ReflectUtil.invoke(concurrentContainer, "getAfterRollbackProcessor")); + + Method getRecordInterceptor = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), "getRecordInterceptor", Constants.EMPTY_CLASS_ARRAY); + ReflectUtil.setAccessible(getRecordInterceptor); + container.setRecordInterceptor(ReflectUtil.invoke(concurrentContainer, getRecordInterceptor)); + + Method isInterceptBeforeTx = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), "isInterceptBeforeTx", Constants.EMPTY_CLASS_ARRAY); + ReflectUtil.setAccessible(isInterceptBeforeTx); + container.setInterceptBeforeTx(ReflectUtil.invoke(concurrentContainer, isInterceptBeforeTx)); + + container.setEmergencyStop(() -> { + concurrentContainer.stop(() -> { + }); + ReflectUtil.invoke(concurrentContainer, "publishContainerStoppedEvent"); + }); + Method isPaused = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), "isPaused", Constants.EMPTY_CLASS_ARRAY); + ReflectUtil.setAccessible(isPaused); + + if (ReflectUtil.invoke(concurrentContainer, isPaused)) { + container.pause(); + } + container.start(); + containers.add(container); + } + return true; + } + + @Override + public void onApplicationEvent(ApplicationStartedEvent event) { + try { + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = ApplicationContextHolder.getBean(KafkaListenerEndpointRegistry.class); + this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry; + } catch (Exception ex) { + log.error("Failed to get Kafka thread pool.", ex); + } + } +} diff --git a/hippo4j-adapter/pom.xml b/hippo4j-adapter/pom.xml index e65296d8..263f7ccd 100644 --- a/hippo4j-adapter/pom.xml +++ b/hippo4j-adapter/pom.xml @@ -17,6 +17,7 @@ hippo4j-adapter-alibaba-dubbo hippo4j-adapter-rabbitmq hippo4j-adapter-rocketmq + hippo4j-adapter-kafka hippo4j-adapter-hystrix hippo4j-adapter-spring-cloud-stream-rocketmq hippo4j-adapter-spring-cloud-stream-rabbitmq diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java index 8f4fc5b4..317740ba 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java @@ -20,6 +20,7 @@ package cn.hippo4j.common.toolkit; import cn.hippo4j.common.web.exception.IllegalException; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.springframework.cglib.core.ReflectUtils; import java.lang.reflect.AccessibleObject; import java.lang.reflect.Field; @@ -234,6 +235,28 @@ public class ReflectUtil { return null; } + /** + * Invoke. + * + * @param obj the obj + * @param methodName the method Name + * @param arguments parameters + * @return result for zhe method + */ + @SuppressWarnings("unchecked") + public static T invoke(Object obj, String methodName, Object... arguments) { + try { + Method method = ReflectUtil.getMethodByName(obj.getClass(), methodName); + if (method == null) { + throw new IllegalException(methodName + "method not exists"); + } + ReflectUtil.setAccessible(method); + return (T) method.invoke(obj, arguments); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalException(e); + } + } + /** * Invoke. * @@ -283,4 +306,19 @@ public class ReflectUtil { } return null; } + + /** + * + * @param clazz + * @param methodName + * @param parameterTypes + * @return + */ + public static Method findDeclaredMethod(Class clazz, String methodName, Class[] parameterTypes) { + try { + return ReflectUtils.findDeclaredMethod(clazz, methodName, parameterTypes); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } } diff --git a/hippo4j-common/src/test/java/cn/hippo4j/common/spi/MyArrayBlockingQueue.java b/hippo4j-common/src/test/java/cn/hippo4j/common/spi/MyArrayBlockingQueue.java index 16b86e2e..47408032 100644 --- a/hippo4j-common/src/test/java/cn/hippo4j/common/spi/MyArrayBlockingQueue.java +++ b/hippo4j-common/src/test/java/cn/hippo4j/common/spi/MyArrayBlockingQueue.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.common.spi; import cn.hippo4j.common.executor.support.CustomBlockingQueue; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/pom.xml b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/pom.xml new file mode 100644 index 00000000..70e5d3a4 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-example + ${revision} + + hippo4j-spring-boot-starter-adapter-kafka-example + + + true + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + + + cn.hippo4j + hippo4j-spring-boot-starter-adapter-kafka + ${project.version} + + + cn.hippo4j + hippo4j-spring-boot-starter + ${project.version} + + + cn.hippo4j + hippo4j-example-core + ${revision} + + + org.springframework.kafka + spring-kafka + + + diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/MessageConsume.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/MessageConsume.java new file mode 100644 index 00000000..c146b598 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/MessageConsume.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.kafka.example; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +/** + * Message consume. + */ +@Slf4j +@Service +public class MessageConsume { + + @KafkaListener(id = "testId1", topics = MessageProduce.TOPIC, groupId = "testGroup") + public void onMessage(String message) { + log.info("Message: {}", message); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/MessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/MessageProduce.java new file mode 100644 index 00000000..d910d9e1 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/MessageProduce.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.kafka.example; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.PostConstruct; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Message produce. + */ +@Slf4j +@Component +@RestController +@AllArgsConstructor +public class MessageProduce { + + public static final String TOPIC = "test"; + + private final KafkaTemplate template; + + @GetMapping("/message/send") + public String sendMessage() { + template.send(TOPIC, "testMessage"); + return "success"; + } + + @PostConstruct + public void init() { + Thread t = new Thread(() -> { + while (true) { + String message = UUID.randomUUID().toString(); + template.send(TOPIC, "autoTestMessage " + message); + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + t.start(); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/ServerAdapterKafkaExampleApplication.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/ServerAdapterKafkaExampleApplication.java new file mode 100644 index 00000000..08e6f069 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/ServerAdapterKafkaExampleApplication.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.kafka.example; + +import cn.hippo4j.core.enable.EnableDynamicThreadPool; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@EnableDynamicThreadPool +@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.kafka.example"}) +public class ServerAdapterKafkaExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(ServerAdapterKafkaExampleApplication.class, args); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/resources/application.properties b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/resources/application.properties new file mode 100644 index 00000000..ed0c980e --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/resources/application.properties @@ -0,0 +1,13 @@ +server.port=8011 + +debug=true + +spring.application.name=hippo4j-spring-boot-starter-adapter-kafka-example +spring.dynamic.thread-pool.server-addr=http://localhost:6691 +spring.dynamic.thread-pool.namespace=prescription +spring.dynamic.thread-pool.item-id=dynamic-threadpool-example +spring.dynamic.thread-pool.username=admin +spring.dynamic.thread-pool.password=123456 + + +spring.kafka.bootstrap-servers=localhost:9093,localhost:9094,localhost:9095 \ No newline at end of file diff --git a/hippo4j-example/pom.xml b/hippo4j-example/pom.xml index 0a3a623e..04daf166 100644 --- a/hippo4j-example/pom.xml +++ b/hippo4j-example/pom.xml @@ -26,6 +26,7 @@ hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example hippo4j-spring-boot-starter-adapter-rocketmq-example + hippo4j-spring-boot-starter-adapter-kafka-example hippo4j-config-etcd-spring-boot-starter-example hippo4j-config-nacos-spring-boot-1x-starter-example hippo4j-config-apollo-spring-boot-1x-starter-example diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/pom.xml b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/pom.xml new file mode 100644 index 00000000..1de46008 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/pom.xml @@ -0,0 +1,24 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-spring-boot-starter-adapter + ${revision} + + hippo4j-spring-boot-starter-adapter-kafka + + + + cn.hippo4j + hippo4j-adapter-kafka + ${project.version} + + + org.springframework.kafka + spring-kafka + true + + + diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/KafkaAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/KafkaAdapterAutoConfiguration.java new file mode 100644 index 00000000..24107d28 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/KafkaAdapterAutoConfiguration.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.kafka; + +import cn.hippo4j.adapter.kafka.KafkaThreadPoolAdapter; +import cn.hippo4j.common.config.ApplicationContextHolder; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; + +/** + * Kafka adapter auto configuration. + */ +@Configuration +@AutoConfigureAfter(KafkaAutoConfiguration.class) +public class KafkaAdapterAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + public ApplicationContextHolder simpleApplicationContextHolder() { + return new ApplicationContextHolder(); + } + + @Bean + @SuppressWarnings("all") + @ConditionalOnBean(KafkaTemplate.class) + public KafkaThreadPoolAdapter kafkaThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) { + return new KafkaThreadPoolAdapter(); + } +} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/src/main/resources/META-INF/spring.factories b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..592fba93 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-kafka/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.springboot.starter.adapter.kafka.KafkaAdapterAutoConfiguration diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml index f2c5b783..1781a251 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml @@ -17,6 +17,7 @@ hippo4j-spring-boot-starter-adapter-alibaba-dubbo hippo4j-spring-boot-starter-adapter-rabbitmq hippo4j-spring-boot-starter-adapter-rocketmq + hippo4j-spring-boot-starter-adapter-kafka hippo4j-spring-boot-starter-adapter-hystrix hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq