From 077ba1bb3e5bd8842579b2c7672b374b423119b1 Mon Sep 17 00:00:00 2001 From: heqijun Date: Tue, 17 Jun 2025 17:11:52 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=95=E5=85=A5hippo4j=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0+=E6=B5=8B=E8=AF=95=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E7=BA=BF=E7=A8=8B=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/utils/CMPPSubmitRepoMapUtil.java | 32 ++++++++++++ beacon-smsgateway/pom.xml | 8 +++ .../smsgateway/SmsGatewayApplication.java | 2 + .../smsgateway/config/ThreadPoolConfig.java | 49 +++++++++++++++++++ .../smsgateway/controller/TestController.java | 35 +++++++++++++ .../smsgateway/mq/SmsGatewayListener.java | 22 +++++++++ .../src/main/resources/bootstrap.yml | 10 +++- 7 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java diff --git a/beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java b/beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java new file mode 100644 index 0000000..73f9fc0 --- /dev/null +++ b/beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java @@ -0,0 +1,32 @@ +package com.mashibing.common.utils; + +import com.mashibing.common.pojo.StandardSubmit; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author heqijun + * @ClassName: CMPPSubmitRepoMapUtil + * @Description: 临时存储StandardSubmit对象 + * @date 2025/6/15 17:07 + */ + +public class CMPPSubmitRepoMapUtil { + + private static final ConcurrentHashMap TEMP_SUBMIT = new ConcurrentHashMap<>(); + + public static StandardSubmit put(int sequence, StandardSubmit standardSubmit) { + return TEMP_SUBMIT.put(sequence + "", standardSubmit); + } + + public static StandardSubmit get(int sequence) { + return TEMP_SUBMIT.get(sequence + ""); + } + + public static StandardSubmit remove(int sequence) { + return TEMP_SUBMIT.remove(sequence + ""); + } + + private CMPPSubmitRepoMapUtil() { + } +} diff --git a/beacon-smsgateway/pom.xml b/beacon-smsgateway/pom.xml index ed8fccf..20b5114 100644 --- a/beacon-smsgateway/pom.xml +++ b/beacon-smsgateway/pom.xml @@ -63,6 +63,14 @@ org.apache.commons commons-lang3 + + + cn.hippo4j + hippo4j-spring-boot-starter + 1.5.0 + + + \ No newline at end of file diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java index c42b689..5d66cdf 100644 --- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java @@ -1,5 +1,6 @@ package com.mashibing.smsgateway; +import cn.hippo4j.core.enable.EnableDynamicThreadPool; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @@ -15,6 +16,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; @SpringBootApplication @EnableFeignClients @EnableDiscoveryClient +@EnableDynamicThreadPool // hippo4j启动类注解 开启动态线程池 public class SmsGatewayApplication { public static void main(String[] args) { SpringApplication.run(SmsGatewayApplication.class, args); diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java new file mode 100644 index 0000000..7e32cbd --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java @@ -0,0 +1,49 @@ +package com.mashibing.smsgateway.config; + +import cn.hippo4j.core.executor.DynamicThreadPool; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author heqijun + * @ClassName: ThreadPoolConfig + * @Description: TODO(这里用一句话描述这个类的作用) + * @date 2025/6/16 20:35 + */ + +@Configuration +public class ThreadPoolConfig { + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor cmppSubmitPool() { + // 短信提交应答 + String threadPoolId = "cmpp-submit"; + + return ThreadPoolBuilder.builder() + // 指定线程名称的前缀 + .threadFactory(threadPoolId) + // 线程池在Hippo4j中的唯一标识 + .threadPoolId(threadPoolId) + // 代表动态线程池 + .dynamicPool() + .build(); + } + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor cmppDeliverPool() { + //短信状态报告应答 + String threadPoolId = "cmpp-deliver"; + + return ThreadPoolBuilder.builder() + .threadFactory(threadPoolId) + .threadPoolId(threadPoolId) + .dynamicPool() + .build(); + } + +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java new file mode 100644 index 0000000..8441b10 --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java @@ -0,0 +1,35 @@ +package com.mashibing.smsgateway.controller; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author heqijun + * @ClassName: TestController + * @Description: 动态线程池测试类 + * @date 2025/6/17 16:57 + */ + +@Slf4j +@RestController +public class TestController { + + @Resource + ThreadPoolExecutor cmppSubmitPool; + + @Resource + ThreadPoolExecutor cmppDeliverPool; + + @GetMapping("test") + public String test() { + cmppSubmitPool.execute(() -> System.out.println("【" + LocalDateTime.now() + "】" + + Thread.currentThread().getName())); + return "OK"; + } +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java index 03b44fd..e4bb13d 100644 --- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java @@ -1,10 +1,16 @@ package com.mashibing.smsgateway.mq; import com.mashibing.common.pojo.StandardSubmit; +import com.mashibing.common.utils.CMPPSubmitRepoMapUtil; +import com.mashibing.smsgateway.netty4.NettyClient; +import com.mashibing.smsgateway.netty4.entity.CmppSubmit; +import com.mashibing.smsgateway.netty4.utils.Command; +import com.mashibing.smsgateway.netty4.utils.MsgUtils; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; @@ -20,12 +26,28 @@ import java.io.IOException; @Component public class SmsGatewayListener { + @Autowired + NettyClient nettyClient; + @RabbitListener(queues = {"${gateway.sendtopic}"}) public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException { log.info("【网关模块】接收到消息,submit={}", submit); // 完成与运营商交互,发送一次请求,接收两次响应 + String srcId = submit.getSrcNumber(); + int sequence = MsgUtils.getSequence(); + String mobile = submit.getMobile(); + String text = submit.getText(); + // 构建CmppSubmit + CmppSubmit cmppSubmit = new CmppSubmit(Command.CMPP2_VERSION, srcId, sequence, mobile, text); + + // 临时存储StandardSubmit对象,用以在接收到运营商的响应时获取StandardSubmit数据 + CMPPSubmitRepoMapUtil.put(sequence, submit); + + //发送请求 + nettyClient.submit(cmppSubmit); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } diff --git a/beacon-smsgateway/src/main/resources/bootstrap.yml b/beacon-smsgateway/src/main/resources/bootstrap.yml index 58dcbfb..b665fca 100644 --- a/beacon-smsgateway/src/main/resources/bootstrap.yml +++ b/beacon-smsgateway/src/main/resources/bootstrap.yml @@ -14,4 +14,12 @@ spring: config: server-addr: 192.168.1.13:8848 file-extension: yml - # beacon-smsgateway-dev.yml \ No newline at end of file + # beacon-smsgateway-dev.yml + +# dynamic: +# thread-pool: +# server-addr: http://localhost:6691 +# username: admin +# password: 123456 +# namespace: beacon-cloud +# item-id: ${spring.application.name} \ No newline at end of file