diff --git a/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java index a00f3416..bb3517f1 100644 --- a/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-rocketmq/src/main/java/cn/hippo4j/adapter/rocketmq/RocketMQThreadPoolAdapter.java @@ -20,9 +20,21 @@ package cn.hippo4j.adapter.rocketmq; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.toolkit.ReflectUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService; +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; + +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; /** * RocketMQ thread-pool adapter. @@ -30,6 +42,8 @@ import org.springframework.context.ApplicationListener; @Slf4j public class RocketMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { + private final Map ROCKET_MQ_CONSUME_EXECUTOR = Maps.newHashMap(); + @Override public String mark() { return "RocketMQ"; @@ -37,16 +51,61 @@ public class RocketMQThreadPoolAdapter implements ThreadPoolAdapter, Application @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { - return null; + ThreadPoolAdapterState result = new ThreadPoolAdapterState(); + ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_CONSUME_EXECUTOR.get(identify); + if (rocketMQConsumeExecutor != null) { + result.setThreadPoolKey(identify); + result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize()); + result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize()); + return result; + } + log.warn("[{}] RocketMQ consuming thread pool not found.", identify); + return result; + } + + @Override + public List getThreadPoolStates() { + List adapterStateList = Lists.newArrayList(); + ROCKET_MQ_CONSUME_EXECUTOR.forEach( + (key, val) -> adapterStateList.add(getThreadPoolState(key))); + return adapterStateList; } @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_CONSUME_EXECUTOR.get(threadPoolKey); + if (rocketMQConsumeExecutor != null) { + int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize(); + int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize(); + rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); + rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize())); + return true; + } + log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey); return false; } @Override public void onApplicationEvent(ApplicationStartedEvent event) { + Map containerMap = + ApplicationContextHolder.getBeansOfType(DefaultRocketMQListenerContainer.class); + try { + for (DefaultRocketMQListenerContainer container : containerMap.values()) { + DefaultMQPushConsumer consumer = container.getConsumer(); + if (consumer != null) { + ConsumeMessageService consumeMessageService = consumer.getDefaultMQPushConsumerImpl().getConsumeMessageService(); + ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor"); + ROCKET_MQ_CONSUME_EXECUTOR.put(container.getConsumerGroup(), consumeExecutor); + } + } + } catch (Exception ex) { + log.error("Failed to get RocketMQ thread pool.", ex); + } } } diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/pom.xml b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/pom.xml new file mode 100644 index 00000000..82396531 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-example + ${revision} + + hippo4j-spring-boot-starter-adapter-rocketmq-example + + + true + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-json + + + + org.projectlombok + lombok + + + + cn.hippo4j + hippo4j-spring-boot-starter-adapter-rocketmq + ${project.version} + + + + cn.hippo4j + hippo4j-spring-boot-starter + ${project.version} + + + + cn.hippo4j + hippo4j-example-core + ${revision} + + + + \ No newline at end of file diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/Hippo4jAdapterRocketMQExampleApplication.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/Hippo4jAdapterRocketMQExampleApplication.java new file mode 100644 index 00000000..00b4348c --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/Hippo4jAdapterRocketMQExampleApplication.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.rocketmq.example; + +import cn.hippo4j.core.enable.EnableDynamicThreadPool; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@EnableDynamicThreadPool +@SpringBootApplication(scanBasePackages = {"cn.hippo4j.example.core", "cn.hippo4j.springboot.starter.adapter.rocketmq.example"}) +public class Hippo4jAdapterRocketMQExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(Hippo4jAdapterRocketMQExampleApplication.class, args); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/MessageConsume.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/MessageConsume.java new file mode 100644 index 00000000..a4d0041f --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/MessageConsume.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.rocketmq.example; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Service; + +/** + * Message consume. + */ +@Slf4j +@Service +@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic", consumeThreadNumber = 2) +public class MessageConsume implements RocketMQListener { + + @Override + public void onMessage(String message) { + log.info("Message: {}", message); + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/MessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/MessageProduce.java new file mode 100644 index 00000000..475ecff8 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/rocketmq/example/MessageProduce.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.adapter.rocketmq.example; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import java.util.Date; + +/** + * Message produce. + */ +@Slf4j +@Component +@RestController +@AllArgsConstructor +public class MessageProduce { + + private final RocketMQTemplate rocketMQTemplate; + + @GetMapping("/message/send") + public String sendMessage() { + rocketMQTemplate.convertAndSend("test-topic", new Date().toString()); + return "success"; + } +} diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/resources/application.properties b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/resources/application.properties new file mode 100644 index 00000000..2d106599 --- /dev/null +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-rocketmq-example/src/main/resources/application.properties @@ -0,0 +1,11 @@ +server.port=8099 + +spring.profiles.active=dev +spring.dynamic.thread-pool.server-addr=http://localhost:6691 +spring.dynamic.thread-pool.namespace=prescription +spring.dynamic.thread-pool.item-id=dynamic-threadpool-example +spring.dynamic.thread-pool.username=admin +spring.dynamic.thread-pool.password=123456 + +rocketmq.nameServer=127.0.0.1:9876 +rocketmq.producer.group=my-producer diff --git a/hippo4j-example/pom.xml b/hippo4j-example/pom.xml index 13079c7c..9e03a019 100644 --- a/hippo4j-example/pom.xml +++ b/hippo4j-example/pom.xml @@ -22,6 +22,7 @@ hippo4j-core-zookeeper-spring-boot-starter-example hippo4j-spring-boot-starter-adapter-rabbitmq-example hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example + hippo4j-spring-boot-starter-adapter-rocketmq-example