From 1d5c0e8e2f9468e73c1460852124a05cd9226a94 Mon Sep 17 00:00:00 2001 From: Administrator Date: Mon, 22 May 2023 18:32:02 +0800 Subject: [PATCH] =?UTF-8?q?=E9=83=A8=E7=BD=B2Linux?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 27 ++++-- .../config/RabbitMQThreadPoolConfig.java | 81 ++++++++--------- .../mashibing/config/ThreadPoolConfig.java | 39 +++++++- .../controller/Hippo4jController.java | 32 +++++++ .../java/com/mashibing/mq/TestListener.java | 35 ++++---- .../com/mashibing/service/Hippo4jService.java | 11 +++ .../service/impl/Hippo4jServiceImpl.java | 89 +++++++++++++++++++ src/main/resources/application-dev.yml | 16 ++++ src/main/resources/application-test.yml | 10 +++ src/main/resources/application.yml | 17 +--- 10 files changed, 266 insertions(+), 91 deletions(-) create mode 100644 src/main/java/com/mashibing/controller/Hippo4jController.java create mode 100644 src/main/java/com/mashibing/service/Hippo4jService.java create mode 100644 src/main/java/com/mashibing/service/impl/Hippo4jServiceImpl.java create mode 100644 src/main/resources/application-dev.yml create mode 100644 src/main/resources/application-test.yml diff --git a/pom.xml b/pom.xml index 201be8b..9922e0f 100644 --- a/pom.xml +++ b/pom.xml @@ -24,17 +24,26 @@ hippo4j-spring-boot-starter 1.5.0 - - org.springframework.boot - spring-boot-starter-amqp - - - cn.hippo4j - hippo4j-spring-boot-starter-adapter-rabbitmq - 1.5.0 - + + + + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/src/main/java/com/mashibing/config/RabbitMQThreadPoolConfig.java b/src/main/java/com/mashibing/config/RabbitMQThreadPoolConfig.java index c43bda1..ef6af6b 100644 --- a/src/main/java/com/mashibing/config/RabbitMQThreadPoolConfig.java +++ b/src/main/java/com/mashibing/config/RabbitMQThreadPoolConfig.java @@ -1,46 +1,35 @@ -package com.mashibing.config; - -import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; -import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -/** - * @author zjw - * @description - */ -@Configuration -public class RabbitMQThreadPoolConfig { - - /** - * 构建消费者要用到的线程 - * @return - */ - @Bean - public ThreadPoolTaskExecutor rabbitThreadPool(){ - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(5); - executor.setMaxPoolSize(5); - executor.setQueueCapacity(100); - executor.setThreadNamePrefix("rabbit-"); - return executor; - } - - /** - * 构建容器工厂,将线程池设置进入 - * @param connectionFactory - * @return - */ - @Bean - public AbstractRabbitListenerContainerFactory defaultContainerFactory(ThreadPoolTaskExecutor rabbitThreadPool, - AbstractConnectionFactory connectionFactory){ - DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); - factory.setConnectionFactory(connectionFactory); - connectionFactory.setExecutor(rabbitThreadPool); - return factory; - } - -} +//package com.mashibing.config; +// +//import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory; +//import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory; +//import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +// +// +// +//@Configuration +//public class RabbitMQThreadPoolConfig { +// +// @Bean +// public ThreadPoolTaskExecutor rabbitThreadPool(){ +// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); +// executor.setCorePoolSize(5); +// executor.setMaxPoolSize(5); +// executor.setQueueCapacity(100); +// executor.setThreadNamePrefix("rabbit-"); +// return executor; +// } +// +// +// @Bean +// public AbstractRabbitListenerContainerFactory defaultContainerFactory(ThreadPoolTaskExecutor rabbitThreadPool, +// AbstractConnectionFactory connectionFactory){ +// DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); +// factory.setConnectionFactory(connectionFactory); +// connectionFactory.setExecutor(rabbitThreadPool); +// return factory; +// } +// +//} diff --git a/src/main/java/com/mashibing/config/ThreadPoolConfig.java b/src/main/java/com/mashibing/config/ThreadPoolConfig.java index 4ef1e1b..3319d06 100644 --- a/src/main/java/com/mashibing/config/ThreadPoolConfig.java +++ b/src/main/java/com/mashibing/config/ThreadPoolConfig.java @@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit; /** * 交给Hippo4j-Server管理的线程池 + * * @author zjw * @description */ @@ -19,7 +20,7 @@ public class ThreadPoolConfig { @Bean @DynamicThreadPool - public ThreadPoolExecutor testThreadPool(){ + public ThreadPoolExecutor testThreadPool() { //1、 采用线程池Builder去构建 ThreadPoolExecutor testThreadPool = ThreadPoolBuilder.builder() .corePoolSize(10) @@ -34,4 +35,40 @@ public class ThreadPoolConfig { .build(); return testThreadPool; } + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor ioThreadPool() { + //1、 采用线程池Builder去构建 + ThreadPoolExecutor testThreadPool = ThreadPoolBuilder.builder() + .corePoolSize(10) + .maximumPoolSize(10) + .keepAliveTime(10) + .timeUnit(TimeUnit.SECONDS) + .workQueue(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE) + .threadFactory("io") + .rejected(new ThreadPoolExecutor.AbortPolicy()) + .threadPoolId("io") + .dynamicPool() + .build(); + return testThreadPool; + } + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor cpuThreadPool() { + //1、 采用线程池Builder去构建 + ThreadPoolExecutor testThreadPool = ThreadPoolBuilder.builder() + .corePoolSize(10) + .maximumPoolSize(10) + .keepAliveTime(10) + .timeUnit(TimeUnit.SECONDS) + .workQueue(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE) + .threadFactory("cpu") + .rejected(new ThreadPoolExecutor.AbortPolicy()) + .threadPoolId("cpu") + .dynamicPool() + .build(); + return testThreadPool; + } } diff --git a/src/main/java/com/mashibing/controller/Hippo4jController.java b/src/main/java/com/mashibing/controller/Hippo4jController.java new file mode 100644 index 0000000..f6a7947 --- /dev/null +++ b/src/main/java/com/mashibing/controller/Hippo4jController.java @@ -0,0 +1,32 @@ +package com.mashibing.controller; + +import com.mashibing.service.Hippo4jService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author zjw + * @description + */ +@RestController +public class Hippo4jController { + + + @Autowired + private Hippo4jService service; + + + @GetMapping("/cpu") + public String cpu() throws InterruptedException { + Long result = service.doSomeCPUThing(); + return result + ""; + } + + @GetMapping("/io") + public String io() throws Exception { + String result = service.doIOSomeThing(); + return result; + } + +} diff --git a/src/main/java/com/mashibing/mq/TestListener.java b/src/main/java/com/mashibing/mq/TestListener.java index 6fe3087..117396f 100644 --- a/src/main/java/com/mashibing/mq/TestListener.java +++ b/src/main/java/com/mashibing/mq/TestListener.java @@ -1,19 +1,16 @@ -package com.mashibing.mq; - -import com.mashibing.controller.TestController; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -/** - * @author zjw - * @description - */ -@Component -public class TestListener { - - @RabbitListener(queues = "hippo4j",containerFactory = "defaultContainerFactory") - public void consume(String message){ - System.out.println(Thread.currentThread().getName() + ":消费消息 --> " + message); - } - -} +//package com.mashibing.mq; +// +//import com.mashibing.controller.TestController; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.stereotype.Component; +// +// +//@Component +//public class TestListener { +// +// @RabbitListener(queues = "hippo4j",containerFactory = "defaultContainerFactory") +// public void consume(String message){ +// System.out.println(Thread.currentThread().getName() + ":消费消息 --> " + message); +// } +// +//} diff --git a/src/main/java/com/mashibing/service/Hippo4jService.java b/src/main/java/com/mashibing/service/Hippo4jService.java new file mode 100644 index 0000000..35b0e25 --- /dev/null +++ b/src/main/java/com/mashibing/service/Hippo4jService.java @@ -0,0 +1,11 @@ +package com.mashibing.service; + +/** + * @author zjw + * @description + */ +public interface Hippo4jService { + String doIOSomeThing() throws Exception; + + Long doSomeCPUThing() throws InterruptedException; +} diff --git a/src/main/java/com/mashibing/service/impl/Hippo4jServiceImpl.java b/src/main/java/com/mashibing/service/impl/Hippo4jServiceImpl.java new file mode 100644 index 0000000..b745b6d --- /dev/null +++ b/src/main/java/com/mashibing/service/impl/Hippo4jServiceImpl.java @@ -0,0 +1,89 @@ +package com.mashibing.service.impl; + +import com.mashibing.service.Hippo4jService; +import lombok.SneakyThrows; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author zjw + * @description + */ +@Service +public class Hippo4jServiceImpl implements Hippo4jService { + + @Resource + private ThreadPoolExecutor ioThreadPool; + + @Resource + private ThreadPoolExecutor cpuThreadPool; + + @Override + public String doIOSomeThing() throws Exception{ + CountDownLatch latch = new CountDownLatch(3); + Future job1Result = ioThreadPool.submit(() -> { + String result1 = job1(); + latch.countDown(); + return result1; + }); + Future job2Result = ioThreadPool.submit(() -> { + String result2 = job2(); + latch.countDown(); + return result2; + }); + Future job3Result = ioThreadPool.submit(() -> { + String result3 = job3(); + latch.countDown(); + return result3; + }); + latch.await(); + return job1Result.get() + job1Result.get() + job1Result.get(); + } + @SneakyThrows + private String job1() { + Thread.sleep(100); + return "RedisResult!"; + } + @SneakyThrows + private String job2() { + Thread.sleep(200); + return "ServiceResult!"; + } + @SneakyThrows + private String job3() { + Thread.sleep(200); + return "MySQLResult!"; + } + + + + @Override + public Long doSomeCPUThing() throws InterruptedException { + AtomicLong atomicLong = new AtomicLong(0); + CountDownLatch latch = new CountDownLatch(2); + cpuThreadPool.execute(() -> { + incr(atomicLong); + latch.countDown(); + }); + cpuThreadPool.execute(() -> { + incr(atomicLong); + latch.countDown(); + }); + latch.await(); + return atomicLong.get(); + } + private void incr(AtomicLong atomicLong){ + for (int i = 0; i < 5000000; i++) { + atomicLong.incrementAndGet(); + } + } + + + + +} diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml new file mode 100644 index 0000000..b13400d --- /dev/null +++ b/src/main/resources/application-dev.yml @@ -0,0 +1,16 @@ +spring: + application: + name: hippo4j-client + dynamic: + thread-pool: + server-addr: 192.168.11.88:6691 + username: admin + password: 123456 + namespace: mashibing # 租户名称 + item-id: ${spring.application.name} # 项目名称,需要与与服务名称保持一致。 + rabbitmq: + host: 192.168.11.88 + port: 5672 + username: guest + password: guest + virtual-host: / \ No newline at end of file diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml new file mode 100644 index 0000000..eff2ad7 --- /dev/null +++ b/src/main/resources/application-test.yml @@ -0,0 +1,10 @@ +spring: + application: + name: hippo4j-client + dynamic: + thread-pool: + server-addr: localhost:6691 + username: admin + password: 123456 + namespace: mashibing # 租户名称 + item-id: ${spring.application.name} # 项目名称,需要与与服务名称保持一致。 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ea5d2df..caf4dfc 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,18 +1,3 @@ spring: profiles: - active: dev - application: - name: hippo4j-client - dynamic: - thread-pool: - server-addr: 192.168.11.88:6691 - username: admin - password: 123456 - namespace: mashibing # 租户名称 - item-id: ${spring.application.name} # 项目名称,需要与与服务名称保持一致。 - rabbitmq: - host: 192.168.11.88 - port: 5672 - username: guest - password: guest - virtual-host: / \ No newline at end of file + active: dev \ No newline at end of file