引入hippo4j动态线程池+测试动态线程池

main
heqijun 3 months ago
parent 711e2a9d8e
commit 077ba1bb3e

@ -0,0 +1,32 @@
package com.mashibing.common.utils;
import com.mashibing.common.pojo.StandardSubmit;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author heqijun
* @ClassName: CMPPSubmitRepoMapUtil
* @Description: StandardSubmit
* @date 2025/6/15 17:07
*/
public class CMPPSubmitRepoMapUtil {
private static final ConcurrentHashMap<String, StandardSubmit> TEMP_SUBMIT = new ConcurrentHashMap<>();
public static StandardSubmit put(int sequence, StandardSubmit standardSubmit) {
return TEMP_SUBMIT.put(sequence + "", standardSubmit);
}
public static StandardSubmit get(int sequence) {
return TEMP_SUBMIT.get(sequence + "");
}
public static StandardSubmit remove(int sequence) {
return TEMP_SUBMIT.remove(sequence + "");
}
private CMPPSubmitRepoMapUtil() {
}
}

@ -63,6 +63,14 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<!-- hippo4j-client -->
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter</artifactId>
<version>1.5.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -1,5 +1,6 @@
package com.mashibing.smsgateway; package com.mashibing.smsgateway;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@ -15,6 +16,7 @@ import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication @SpringBootApplication
@EnableFeignClients @EnableFeignClients
@EnableDiscoveryClient @EnableDiscoveryClient
@EnableDynamicThreadPool // hippo4j启动类注解 开启动态线程池
public class SmsGatewayApplication { public class SmsGatewayApplication {
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(SmsGatewayApplication.class, args); SpringApplication.run(SmsGatewayApplication.class, args);

@ -0,0 +1,49 @@
package com.mashibing.smsgateway.config;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author heqijun
* @ClassName: ThreadPoolConfig
* @Description: TODO()
* @date 2025/6/16 20:35
*/
@Configuration
public class ThreadPoolConfig {
@Bean
@DynamicThreadPool
public ThreadPoolExecutor cmppSubmitPool() {
// 短信提交应答
String threadPoolId = "cmpp-submit";
return ThreadPoolBuilder.builder()
// 指定线程名称的前缀
.threadFactory(threadPoolId)
// 线程池在Hippo4j中的唯一标识
.threadPoolId(threadPoolId)
// 代表动态线程池
.dynamicPool()
.build();
}
@Bean
@DynamicThreadPool
public ThreadPoolExecutor cmppDeliverPool() {
//短信状态报告应答
String threadPoolId = "cmpp-deliver";
return ThreadPoolBuilder.builder()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
}
}

@ -0,0 +1,35 @@
package com.mashibing.smsgateway.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author heqijun
* @ClassName: TestController
* @Description: 线
* @date 2025/6/17 16:57
*/
@Slf4j
@RestController
public class TestController {
@Resource
ThreadPoolExecutor cmppSubmitPool;
@Resource
ThreadPoolExecutor cmppDeliverPool;
@GetMapping("test")
public String test() {
cmppSubmitPool.execute(() -> System.out.println("【" + LocalDateTime.now() + "】"
+ Thread.currentThread().getName()));
return "OK";
}
}

@ -1,10 +1,16 @@
package com.mashibing.smsgateway.mq; package com.mashibing.smsgateway.mq;
import com.mashibing.common.pojo.StandardSubmit; import com.mashibing.common.pojo.StandardSubmit;
import com.mashibing.common.utils.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.NettyClient;
import com.mashibing.smsgateway.netty4.entity.CmppSubmit;
import com.mashibing.smsgateway.netty4.utils.Command;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
@ -20,12 +26,28 @@ import java.io.IOException;
@Component @Component
public class SmsGatewayListener { public class SmsGatewayListener {
@Autowired
NettyClient nettyClient;
@RabbitListener(queues = {"${gateway.sendtopic}"}) @RabbitListener(queues = {"${gateway.sendtopic}"})
public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException { public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException {
log.info("【网关模块】接收到消息submit={}", submit); log.info("【网关模块】接收到消息submit={}", submit);
// 完成与运营商交互,发送一次请求,接收两次响应 // 完成与运营商交互,发送一次请求,接收两次响应
String srcId = submit.getSrcNumber();
int sequence = MsgUtils.getSequence();
String mobile = submit.getMobile();
String text = submit.getText();
// 构建CmppSubmit
CmppSubmit cmppSubmit = new CmppSubmit(Command.CMPP2_VERSION, srcId, sequence, mobile, text);
// 临时存储StandardSubmit对象用以在接收到运营商的响应时获取StandardSubmit数据
CMPPSubmitRepoMapUtil.put(sequence, submit);
//发送请求
nettyClient.submit(cmppSubmit);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} }

@ -15,3 +15,11 @@ spring:
server-addr: 192.168.1.13:8848 server-addr: 192.168.1.13:8848
file-extension: yml file-extension: yml
# beacon-smsgateway-dev.yml # beacon-smsgateway-dev.yml
# dynamic:
# thread-pool:
# server-addr: http://localhost:6691
# username: admin
# password: 123456
# namespace: beacon-cloud
# item-id: ${spring.application.name}
Loading…
Cancel
Save