diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolDynamicRefresh.java b/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolDynamicRefresh.java index 57518b56..2221044c 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolDynamicRefresh.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolDynamicRefresh.java @@ -17,6 +17,8 @@ package cn.hippo4j.common.api; +import java.util.Map; + /** * Thread-pool dynamic refresh. */ @@ -28,4 +30,13 @@ public interface ThreadPoolDynamicRefresh { * @param content */ void dynamicRefresh(String content); + + /** + * Dynamic refresh. + * + * @param content + * @param newValueChangeMap + */ + default void dynamicRefresh(String content, Map newValueChangeMap) { + } } diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml index ea8fe3be..0427ee24 100644 --- a/hippo4j-core/pom.xml +++ b/hippo4j-core/pom.xml @@ -35,24 +35,28 @@ + + org.apache.tomcat.embed + tomcat-embed-core + ${tomcat-embed-core.version} + true + + org.springframework.boot spring-boot-starter-tomcat - compile true org.springframework.boot spring-boot-starter-jetty - compile true org.springframework.boot spring-boot-starter-undertow - compile true diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java index e1b5330c..9ad71974 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java @@ -43,7 +43,7 @@ public abstract class AbstractThreadPoolRuntime { * @param threadPoolRunStateInfo * @return */ - protected abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo); + public abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo); /** * Get pool run state. diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java index 5eefb49b..e50f07dd 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java @@ -51,7 +51,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { private final ConfigurableEnvironment environment; @Override - protected ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) { + public ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) { RuntimeInfo runtimeInfo = new RuntimeInfo(); String memoryProportion = StrUtil.builder( "已分配: ", diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java index a002930f..44b766c7 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/TomcatWebThreadPoolHandler.java @@ -22,11 +22,14 @@ import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolRunStateInfo; import cn.hippo4j.core.executor.state.AbstractThreadPoolRuntime; +import cn.hippo4j.core.toolkit.CalculateUtil; +import cn.hutool.core.date.DateUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.web.embedded.tomcat.TomcatWebServer; import org.springframework.boot.web.server.WebServer; +import java.util.Date; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,36 +71,58 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { @Override public ThreadPoolBaseInfo simpleInfo() { ThreadPoolBaseInfo poolBaseInfo = new ThreadPoolBaseInfo(); - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; - int corePoolSize = threadPoolExecutor.getCorePoolSize(); - int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); - RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); - long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); - - BlockingQueue queue = threadPoolExecutor.getQueue(); - int queueSize = queue.size(); - int remainingCapacity = queue.remainingCapacity(); - int queueCapacity = queueSize + remainingCapacity; - + int corePoolSize, maximumPoolSize, queueCapacity; + long keepAliveTime; + String rejectedExecutionHandlerName; + BlockingQueue blockingQueue; + if (executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + corePoolSize = threadPoolExecutor.getCorePoolSize(); + maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); + keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); + blockingQueue = threadPoolExecutor.getQueue(); + int queueSize = blockingQueue.size(); + int remainingCapacity = blockingQueue.remainingCapacity(); + queueCapacity = queueSize + remainingCapacity; + RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); + rejectedExecutionHandlerName = rejectedExecutionHandler.getClass().getSimpleName(); + } else { + org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor; + corePoolSize = tomcatThreadPoolExecutor.getCorePoolSize(); + maximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize(); + keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); + blockingQueue = tomcatThreadPoolExecutor.getQueue(); + int queueSize = blockingQueue.size(); + int remainingCapacity = blockingQueue.remainingCapacity(); + queueCapacity = queueSize + remainingCapacity; + rejectedExecutionHandlerName = tomcatThreadPoolExecutor.getRejectedExecutionHandler().getClass().getSimpleName(); + } poolBaseInfo.setCoreSize(corePoolSize); poolBaseInfo.setMaximumSize(maximumPoolSize); poolBaseInfo.setKeepAliveTime(keepAliveTime); - poolBaseInfo.setQueueType(queue.getClass().getSimpleName()); + poolBaseInfo.setQueueType(blockingQueue.getClass().getSimpleName()); poolBaseInfo.setQueueCapacity(queueCapacity); - poolBaseInfo.setRejectedName(rejectedExecutionHandler.getClass().getSimpleName()); + poolBaseInfo.setRejectedName(rejectedExecutionHandlerName); return poolBaseInfo; } @Override public ThreadPoolParameter getWebThreadPoolParameter() { - ThreadPoolParameterInfo parameterInfo = null; + ThreadPoolParameterInfo parameterInfo = new ThreadPoolParameterInfo(); + int minThreads, maxThreads; + long keepAliveTime; try { - parameterInfo = new ThreadPoolParameterInfo(); - ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor; - int minThreads = tomcatExecutor.getCorePoolSize(); - int maxThreads = tomcatExecutor.getMaximumPoolSize(); - long keepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS); - + if (executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor; + minThreads = tomcatExecutor.getCorePoolSize(); + maxThreads = tomcatExecutor.getMaximumPoolSize(); + keepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS); + } else { + org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor; + minThreads = tomcatThreadPoolExecutor.getCorePoolSize(); + maxThreads = tomcatThreadPoolExecutor.getMaximumPoolSize(); + keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); + } parameterInfo.setCoreSize(minThreads); parameterInfo.setMaxSize(maxThreads); parameterInfo.setKeepAliveTime((int) keepAliveTime); @@ -109,19 +134,75 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService { @Override public ThreadPoolRunStateInfo getWebRunStateInfo() { - return webThreadPoolRunStateHandler.getPoolRunState(null, executor); + if (executor instanceof ThreadPoolExecutor) { + return webThreadPoolRunStateHandler.getPoolRunState(null, executor); + } + ThreadPoolRunStateInfo runStateInfo = new ThreadPoolRunStateInfo(); + org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor; + // 核心线程数 + int corePoolSize = tomcatThreadPoolExecutor.getCorePoolSize(); + // 最大线程数 + int maximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize(); + // 线程池当前线程数 (有锁) + int poolSize = tomcatThreadPoolExecutor.getPoolSize(); + // 活跃线程数 (有锁) + int activeCount = tomcatThreadPoolExecutor.getActiveCount(); + // 同时进入池中的最大线程数 (有锁) + int largestPoolSize = tomcatThreadPoolExecutor.getLargestPoolSize(); + // 线程池中执行任务总数量 (有锁) + long completedTaskCount = tomcatThreadPoolExecutor.getCompletedTaskCount(); + // 当前负载 + String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + ""; + // 峰值负载 + String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + ""; + BlockingQueue queue = tomcatThreadPoolExecutor.getQueue(); + // 队列元素个数 + int queueSize = queue.size(); + // 队列类型 + String queueType = queue.getClass().getSimpleName(); + // 队列剩余容量 + int remainingCapacity = queue.remainingCapacity(); + // 队列容量 + int queueCapacity = queueSize + remainingCapacity; + runStateInfo.setCoreSize(corePoolSize); + runStateInfo.setPoolSize(poolSize); + runStateInfo.setMaximumSize(maximumPoolSize); + runStateInfo.setActiveSize(activeCount); + runStateInfo.setCurrentLoad(currentLoad); + runStateInfo.setPeakLoad(peakLoad); + runStateInfo.setQueueType(queueType); + runStateInfo.setQueueSize(queueSize); + runStateInfo.setQueueCapacity(queueCapacity); + runStateInfo.setQueueRemainingCapacity(remainingCapacity); + runStateInfo.setLargestPoolSize(largestPoolSize); + runStateInfo.setCompletedTaskCount(completedTaskCount); + runStateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date())); + runStateInfo.setTimestamp(System.currentTimeMillis()); + return webThreadPoolRunStateHandler.supplement(runStateInfo); } @Override public void updateWebThreadPool(ThreadPoolParameterInfo threadPoolParameterInfo) { + int originalCoreSize, originalMaximumPoolSize; + long originalKeepAliveTime; try { - ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor; - int originalCoreSize = tomcatExecutor.getCorePoolSize(); - int originalMaximumPoolSize = tomcatExecutor.getMaximumPoolSize(); - long originalKeepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS); - tomcatExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); - tomcatExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); - tomcatExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS); + if (executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + originalCoreSize = threadPoolExecutor.getCorePoolSize(); + originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); + originalKeepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); + threadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); + threadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); + threadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS); + } else { + org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor; + originalCoreSize = tomcatThreadPoolExecutor.getCorePoolSize(); + originalMaximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize(); + originalKeepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); + tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); + tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); + tomcatThreadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS); + } log.info("[TOMCAT] Changed web thread pool. corePoolSize :: [{}], maximumPoolSize :: [{}], keepAliveTime :: [{}]", String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolParameterInfo.corePoolSizeAdapt()), String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolParameterInfo.maximumPoolSizeAdapt()), diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java index d7ef84b9..3a5c633b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/web/WebThreadPoolRunStateHandler.java @@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j; public class WebThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { @Override - protected ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) { + public ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) { RuntimeInfo runtimeInfo = new RuntimeInfo(); String memoryProportion = StrUtil.builder( "已分配: ", diff --git a/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/pom.xml b/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/pom.xml index 735494e3..b8659033 100644 --- a/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/pom.xml +++ b/hippo4j-example/hippo4j-core-apollo-spring-boot-starter-example/pom.xml @@ -47,7 +47,7 @@ com.ctrip.framework.apollo apollo-client - 1.3.0 + ${apollo.version} @@ -56,6 +56,12 @@ 2.2.5.RELEASE + + org.slf4j + slf4j-api + 1.7.21 + + io.micrometer micrometer-registry-prometheus diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/pom.xml b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/pom.xml new file mode 100644 index 00000000..40bcca10 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-example + ${revision} + + hippo4j-spring-boot-starter-adapter-kafka-example + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-json + + + + org.projectlombok + lombok + + + + cn.hippo4j + hippo4j-spring-boot-starter-adapter-kafka + ${project.version} + + + + cn.hippo4j + hippo4j-spring-boot-starter + ${project.version} + + + + cn.hippo4j + hippo4j-example-core + ${revision} + + + + org.springframework.kafka + spring-kafka + + + diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java new file mode 100644 index 00000000..0cf6e45c --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java @@ -0,0 +1,14 @@ +package cn.hippo4j.springboot.starter.adapter.kafka.example; + +import cn.hippo4j.core.enable.EnableDynamicThreadPool; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +@EnableDynamicThreadPool +public class Hippo4jAdapterKafkaExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(Hippo4jAdapterKafkaExampleApplication.class, args); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java new file mode 100644 index 00000000..8f6a5821 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java @@ -0,0 +1,26 @@ +package cn.hippo4j.springboot.starter.adapter.kafka.example.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + * Kafka message consumer. + */ +@Slf4j +@Component +public class KafkaMessageConsumer { + + @KafkaListener(topics = "kafka_message_hippo4j", groupId = "hippo4j") + public void onMessage(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + Optional message = Optional.ofNullable(record.value()); + message.ifPresent(each -> log.info(each.toString())); + ack.acknowledge(); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java new file mode 100644 index 00000000..119fb92c --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java @@ -0,0 +1,39 @@ +package cn.hippo4j.springboot.starter.adapter.kafka.example.produce; + +import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.example.core.dto.SendMessageDTO; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.UUID; + +/** + * Kafka message produce. + */ +@Slf4j +@Component +@RestController +@AllArgsConstructor +public class KafkaMessageProduce { + + private final KafkaTemplate kafkaTemplate; + + private final String TOPIC = "kafka_message_hippo4j"; + + @GetMapping("/message/send") + public String sendMessage(Integer count) { + for (int i = 0; i < count; i++) { + String keys = UUID.randomUUID().toString(); + SendMessageDTO payload = SendMessageDTO.builder() + .receiver("156011xxx91") + .uid(keys) + .build(); + kafkaTemplate.send(TOPIC, JSONUtil.toJSONString(payload)); + } + return "success"; + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/resources/application.properties b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/resources/application.properties new file mode 100644 index 00000000..fe298e76 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/resources/application.properties @@ -0,0 +1,23 @@ +server.port=8092 + +spring.profiles.active=dev +spring.dynamic.thread-pool.server-addr=http://localhost:6691 +spring.dynamic.thread-pool.namespace=prescription +spring.dynamic.thread-pool.item-id=dynamic-threadpool-example +spring.dynamic.thread-pool.username=admin +spring.dynamic.thread-pool.password=123456 + +spring.kafka.bootstrap-servers=127.0.0.1:9092 +spring.kafka.producer.retries=0 +spring.kafka.producer.batch-size=16384 +spring.kafka.producer.buffer-memory=33554432 +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.acks=1 +spring.kafka.consumer.auto-offset-reset=latest +spring.kafka.consumer.enable-auto-commit=false +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.listener.concurrency=2 +spring.kafka.listener.ack-mode=manual_immediate +spring.kafka.listener.missing-topics-fatal=false diff --git a/hippo4j-example/hippo4j-spring-boot-starter-example/pom.xml b/hippo4j-example/hippo4j-spring-boot-starter-example/pom.xml index 69afc946..ef85459f 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-example/pom.xml +++ b/hippo4j-example/hippo4j-spring-boot-starter-example/pom.xml @@ -33,6 +33,12 @@ spring-boot-starter-web + + org.apache.tomcat.embed + tomcat-embed-core + ${tomcat-embed-core.version} + +