diff --git a/beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java b/beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java new file mode 100644 index 0000000..73f9fc0 --- /dev/null +++ b/beacon-common/src/main/java/com/mashibing/common/utils/CMPPSubmitRepoMapUtil.java @@ -0,0 +1,32 @@ +package com.mashibing.common.utils; + +import com.mashibing.common.pojo.StandardSubmit; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author heqijun + * @ClassName: CMPPSubmitRepoMapUtil + * @Description: 临时存储StandardSubmit对象 + * @date 2025/6/15 17:07 + */ + +public class CMPPSubmitRepoMapUtil { + + private static final ConcurrentHashMap TEMP_SUBMIT = new ConcurrentHashMap<>(); + + public static StandardSubmit put(int sequence, StandardSubmit standardSubmit) { + return TEMP_SUBMIT.put(sequence + "", standardSubmit); + } + + public static StandardSubmit get(int sequence) { + return TEMP_SUBMIT.get(sequence + ""); + } + + public static StandardSubmit remove(int sequence) { + return TEMP_SUBMIT.remove(sequence + ""); + } + + private CMPPSubmitRepoMapUtil() { + } +} diff --git a/beacon-smsgateway/pom.xml b/beacon-smsgateway/pom.xml index ed8fccf..20b5114 100644 --- a/beacon-smsgateway/pom.xml +++ b/beacon-smsgateway/pom.xml @@ -63,6 +63,14 @@ org.apache.commons commons-lang3 + + + cn.hippo4j + hippo4j-spring-boot-starter + 1.5.0 + + + \ No newline at end of file diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java index c42b689..5d66cdf 100644 --- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/SmsGatewayApplication.java @@ -1,5 +1,6 @@ package com.mashibing.smsgateway; +import cn.hippo4j.core.enable.EnableDynamicThreadPool; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @@ -15,6 +16,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients; @SpringBootApplication @EnableFeignClients @EnableDiscoveryClient +@EnableDynamicThreadPool // hippo4j启动类注解 开启动态线程池 public class SmsGatewayApplication { public static void main(String[] args) { SpringApplication.run(SmsGatewayApplication.class, args); diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java new file mode 100644 index 0000000..7e32cbd --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/ThreadPoolConfig.java @@ -0,0 +1,49 @@ +package com.mashibing.smsgateway.config; + +import cn.hippo4j.core.executor.DynamicThreadPool; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author heqijun + * @ClassName: ThreadPoolConfig + * @Description: TODO(这里用一句话描述这个类的作用) + * @date 2025/6/16 20:35 + */ + +@Configuration +public class ThreadPoolConfig { + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor cmppSubmitPool() { + // 短信提交应答 + String threadPoolId = "cmpp-submit"; + + return ThreadPoolBuilder.builder() + // 指定线程名称的前缀 + .threadFactory(threadPoolId) + // 线程池在Hippo4j中的唯一标识 + .threadPoolId(threadPoolId) + // 代表动态线程池 + .dynamicPool() + .build(); + } + + @Bean + @DynamicThreadPool + public ThreadPoolExecutor cmppDeliverPool() { + //短信状态报告应答 + String threadPoolId = "cmpp-deliver"; + + return ThreadPoolBuilder.builder() + .threadFactory(threadPoolId) + .threadPoolId(threadPoolId) + .dynamicPool() + .build(); + } + +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java new file mode 100644 index 0000000..8441b10 --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/controller/TestController.java @@ -0,0 +1,35 @@ +package com.mashibing.smsgateway.controller; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author heqijun + * @ClassName: TestController + * @Description: 动态线程池测试类 + * @date 2025/6/17 16:57 + */ + +@Slf4j +@RestController +public class TestController { + + @Resource + ThreadPoolExecutor cmppSubmitPool; + + @Resource + ThreadPoolExecutor cmppDeliverPool; + + @GetMapping("test") + public String test() { + cmppSubmitPool.execute(() -> System.out.println("【" + LocalDateTime.now() + "】" + + Thread.currentThread().getName())); + return "OK"; + } +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java index 03b44fd..e4bb13d 100644 --- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java @@ -1,10 +1,16 @@ package com.mashibing.smsgateway.mq; import com.mashibing.common.pojo.StandardSubmit; +import com.mashibing.common.utils.CMPPSubmitRepoMapUtil; +import com.mashibing.smsgateway.netty4.NettyClient; +import com.mashibing.smsgateway.netty4.entity.CmppSubmit; +import com.mashibing.smsgateway.netty4.utils.Command; +import com.mashibing.smsgateway.netty4.utils.MsgUtils; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; @@ -20,12 +26,28 @@ import java.io.IOException; @Component public class SmsGatewayListener { + @Autowired + NettyClient nettyClient; + @RabbitListener(queues = {"${gateway.sendtopic}"}) public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException { log.info("【网关模块】接收到消息,submit={}", submit); // 完成与运营商交互,发送一次请求,接收两次响应 + String srcId = submit.getSrcNumber(); + int sequence = MsgUtils.getSequence(); + String mobile = submit.getMobile(); + String text = submit.getText(); + // 构建CmppSubmit + CmppSubmit cmppSubmit = new CmppSubmit(Command.CMPP2_VERSION, srcId, sequence, mobile, text); + + // 临时存储StandardSubmit对象,用以在接收到运营商的响应时获取StandardSubmit数据 + CMPPSubmitRepoMapUtil.put(sequence, submit); + + //发送请求 + nettyClient.submit(cmppSubmit); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } diff --git a/beacon-smsgateway/src/main/resources/bootstrap.yml b/beacon-smsgateway/src/main/resources/bootstrap.yml index 58dcbfb..b665fca 100644 --- a/beacon-smsgateway/src/main/resources/bootstrap.yml +++ b/beacon-smsgateway/src/main/resources/bootstrap.yml @@ -14,4 +14,12 @@ spring: config: server-addr: 192.168.1.13:8848 file-extension: yml - # beacon-smsgateway-dev.yml \ No newline at end of file + # beacon-smsgateway-dev.yml + +# dynamic: +# thread-pool: +# server-addr: http://localhost:6691 +# username: admin +# password: 123456 +# namespace: beacon-cloud +# item-id: ${spring.application.name} \ No newline at end of file