From a1b50f82f2c0c5c2dee1889e39c833b39bdadf37 Mon Sep 17 00:00:00 2001
From: msb_221930 <65ddd7e7>
Date: Tue, 26 Mar 2024 17:57:01 +0800
Subject: [PATCH] =?UTF-8?q?=E5=B0=8F=E9=99=A2=203.26=2017:55?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../mashibing/push/mq/PushReportListener.java | 3 +-
beacon-smsgateway/pom.xml | 20 +-
.../smsgateway/config/RabbitMQConfig.java | 29 ++
.../smsgateway/mq/SmsGatewayListener.java | 11 +-
.../smsgateway/netty4/CMPPDecoder.java | 70 ++++
.../smsgateway/netty4/CMPPEncoder.java | 36 ++
.../smsgateway/netty4/CMPPHandler.java | 48 +++
.../smsgateway/netty4/HeartHandler.java | 45 +++
.../smsgateway/netty4/NettyClient.java | 136 +++++++
.../netty4/NettyClientInitializer.java | 40 ++
.../smsgateway/netty4/NettyStartCMPP.java | 26 ++
.../netty4/entity/CmppActiveTest.java | 26 ++
.../netty4/entity/CmppActiveTestResp.java | 27 ++
.../smsgateway/netty4/entity/CmppConnect.java | 44 +++
.../smsgateway/netty4/entity/CmppDeliver.java | 368 ++++++++++++++++++
.../netty4/entity/CmppMessageHeader.java | 43 ++
.../smsgateway/netty4/entity/CmppSubmit.java | 148 +++++++
.../netty4/entity/CmppSubmitResp.java | 38 ++
.../smsgateway/netty4/utils/Command.java | 150 +++++++
.../smsgateway/netty4/utils/MsgUtils.java | 216 ++++++++++
20 files changed, 1520 insertions(+), 4 deletions(-)
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPDecoder.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPEncoder.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPHandler.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/HeartHandler.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/NettyClient.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/NettyClientInitializer.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/NettyStartCMPP.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppActiveTest.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppActiveTestResp.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppConnect.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppDeliver.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppMessageHeader.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppSubmit.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/entity/CmppSubmitResp.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/utils/Command.java
create mode 100644 beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/utils/MsgUtils.java
diff --git a/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java b/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java
index f5a6807..33b9eac 100644
--- a/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java
+++ b/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java
@@ -114,6 +114,7 @@ public class PushReportListener {
String result = restTemplate.postForObject("http://" + report.getCallbackUrl(), new HttpEntity<>(body, httpHeaders), String.class);
flag = SUCCESS.equals(result);
} catch (RestClientException e) {
+ log.info("推送失败。");
}
//3、得到响应后,确认是否为SUCCESS
return flag;
@@ -143,4 +144,4 @@ public class PushReportListener {
}
-}
\ No newline at end of file
+}
diff --git a/beacon-smsgateway/pom.xml b/beacon-smsgateway/pom.xml
index a47bbc1..69d9416 100644
--- a/beacon-smsgateway/pom.xml
+++ b/beacon-smsgateway/pom.xml
@@ -45,6 +45,24 @@
org.springframework.boot
spring-boot-starter-amqp
+
+
+
+
+
+ io.netty
+ netty-all
+ 4.1.69.Final
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+
+
-
\ No newline at end of file
+
diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java
new file mode 100644
index 0000000..4092b14
--- /dev/null
+++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java
@@ -0,0 +1,29 @@
+package com.mashibing.smsgateway.config;
+
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author dch
+ * @create 2024-03-26 14:29
+ * 针对性的配置可以用这种方式
+ */
+//@Configuration
+public class RabbitMQConfig {
+
+ @Bean
+ public SimpleRabbitListenerContainerFactory gatewayContainerFactory(ConnectionFactory connectionFactory,
+ SimpleRabbitListenerContainerFactoryConfigurer configurer){
+ SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
+ simpleRabbitListenerContainerFactory.setConcurrentConsumers(5);
+ simpleRabbitListenerContainerFactory.setPrefetchCount(10);
+ simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+ configurer.configure(simpleRabbitListenerContainerFactory,connectionFactory);
+ return simpleRabbitListenerContainerFactory;
+ }
+
+}
diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java
index cbf665a..61cd798 100644
--- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java
+++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/mq/SmsGatewayListener.java
@@ -18,12 +18,19 @@ import java.io.IOException;
public class SmsGatewayListener {
@RabbitListener(queues = "${gateway.sendtopic}")
- public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException, InterruptedException {
+// public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException, InterruptedException {
+ public void consume(String submit, Channel channel, Message message) throws IOException, InterruptedException {
+ Thread.sleep(1000);
log.info("【短信网关模块】 接收到消息 submit = {}", submit);
+
// =====================完成运营商交互,发送一次请求,接收两次响应==========================
+
+
+ //long deliveryTag 消息的唯一标识。每条消息都有自己的ID号,用于标识该消息在channel中的顺序。当消费者接收到消息后,需要调用channel.basicAck方法并传递deliveryTag来确认消息的处理。
+ //boolean multiple 是否批量确认消息,当传false时,只确认当前 deliveryTag对应的消息;当传true时,会确认当前及之前所有未确认的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
-}
\ No newline at end of file
+}
diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPDecoder.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPDecoder.java
new file mode 100644
index 0000000..e9778ae
--- /dev/null
+++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPDecoder.java
@@ -0,0 +1,70 @@
+package com.mashibing.smsgateway.netty4;
+
+import com.mashibing.smsgateway.netty4.entity.CmppActiveTestResp;
+import com.mashibing.smsgateway.netty4.entity.CmppDeliver;
+import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
+import com.mashibing.smsgateway.netty4.utils.Command;
+import com.mashibing.smsgateway.netty4.utils.MsgUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.util.List;
+
+/**
+ * 中国移动给咱们响应信息时,通过当前Decoder接收并做数据的解析
+ */
+public class CMPPDecoder extends ByteToMessageDecoder {
+
+
+
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List