网关模块对接CMPP,实现通讯,并做第一次回调

master
Administrator 2 years ago
parent f8937b3af6
commit 76574a4e42

@ -1,6 +1,5 @@
package com.mashibing.api.filter;
import com.mashibing.api.form.SingleSendForm;
import com.mashibing.common.model.StandardSubmit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

@ -0,0 +1,30 @@
package com.mashibing.common.enums;
import lombok.Getter;
/**
* @author zjw
* @description
*/
@Getter
public enum CMPP2DeliverEnums {
DELIVERED("DELIVRD","Message is delivered to destination"),
EXPIRED("EXPIRED","Message validity period has expired"),
DELETED("DELETED","Message has been deleted."),
UNDELIVERABLE("UNDELIV","Message is undeliverable"),
ACCEPTED("ACCEPTD","Message is in accepted state"),
UNKNOWN("UNKNOWN","Message is in invalid state"),
REJECTED("REJECTD","Message is in a rejected state"),
;
private String stat;
private String description;
CMPP2DeliverEnums(String stat, String description) {
this.stat = stat;
this.description = description;
}
}

@ -0,0 +1,32 @@
package com.mashibing.common.enums;
import lombok.Getter;
/**
* @author zjw
* @description
*/
@Getter
public enum CMPP2ResultEnums {
OK(0,"正确"),
MESSAGE_BUILD_ERROR(1,"消息结构错"),
COMMAND_WORD_ERROR(2,"命令字错"),
MESSAGE_SEQUENCE_ERROR(3,"消息序号重复"),
MESSAGE_LENGTH_ERROR(4,"消息长度错"),
INCORRECT_TARIFF_CODE(5,"资费代码错"),
Exceeding_maximum_message_length(6,"超过最大信息长"),
BUSINESS_CODE_ERROR(7,"业务代码错"),
FLOW_CONTROL_ERROR(8,"流量控制错"),
UNKNOWN(9,"其他错误")
;
private Integer result;
private String msg;
CMPP2ResultEnums(Integer result, String msg) {
this.result = result;
this.msg = msg;
}
}

@ -21,6 +21,11 @@ import java.time.LocalDateTime;
@AllArgsConstructor
public class StandardReport implements Serializable {
/**
* 便
*/
private String apikey;
/**
*
*/

@ -0,0 +1,34 @@
package com.mashibing.common.util;
import com.mashibing.common.enums.CMPP2DeliverEnums;
import com.mashibing.common.enums.CMPP2ResultEnums;
import java.util.HashMap;
import java.util.Map;
/**
* @author zjw
* @description
*/
public class CMPP2DeliverUtil {
private static Map<String,String> stats = new HashMap<>();
static{
CMPP2DeliverEnums[] cmpp2DeliverEnums = CMPP2DeliverEnums.values();
for (CMPP2DeliverEnums cmpp2DeliverEnum : cmpp2DeliverEnums) {
stats.put(cmpp2DeliverEnum.getStat(),cmpp2DeliverEnum.getDescription());
}
}
/**
* result
* @param stat
* @return
*/
public static String getResultMessage(String stat){
return stats.get(stat);
}
}

@ -0,0 +1,34 @@
package com.mashibing.common.util;
import com.mashibing.common.enums.CMPP2ResultEnums;
import com.mashibing.common.enums.MobileOperatorEnum;
import java.util.HashMap;
import java.util.Map;
/**
* @author zjw
* @description
*/
public class CMPP2ResultUtil {
private static Map<Integer,String> results = new HashMap<>();
static{
CMPP2ResultEnums[] cmpp2ResultEnums = CMPP2ResultEnums.values();
for (CMPP2ResultEnums cmpp2ResultEnum : cmpp2ResultEnums) {
results.put(cmpp2ResultEnum.getResult(),cmpp2ResultEnum.getMsg());
}
}
/**
* result
* @param result
* @return
*/
public static String getResultMessage(Integer result){
return results.get(result);
}
}

@ -0,0 +1,31 @@
package com.mashibing.common.util;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import java.util.concurrent.ConcurrentHashMap;
/**
* CMPP
* @author zjw
* @description
*/
public class CMPPDeliverMapUtil {
private static ConcurrentHashMap<String, StandardReport> map = new ConcurrentHashMap<>();
public static void put(String msgId,StandardReport submit){
map.put(msgId,submit);
}
public static StandardReport get(String msgId){
return map.get(msgId);
}
public static StandardReport remove(String msgId){
return map.remove(msgId);
}
}

@ -0,0 +1,30 @@
package com.mashibing.common.util;
import com.mashibing.common.model.StandardSubmit;
import java.util.concurrent.ConcurrentHashMap;
/**
* CMPP
* @author zjw
* @description
*/
public class CMPPSubmitRepoMapUtil {
private static ConcurrentHashMap<String, StandardSubmit> map = new ConcurrentHashMap<>();
public static void put(int sequence,StandardSubmit submit){
map.put(sequence + "",submit);
}
public static StandardSubmit get(int sequence){
return map.get(sequence + "");
}
public static StandardSubmit remove(int sequence){
return map.remove(sequence + "");
}
}

@ -126,6 +126,9 @@ public class PushReportListener {
if(!flag){
log.info("【推送模块-推送状态报告】 第{}次推送状态报告失败report = {}",report.getResendCount() + 1,report);
report.setResendCount(report.getResendCount() + 1);
if(report.getResendCount() >= 5){
return;
}
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {

@ -40,6 +40,29 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- netty4依赖-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
<!-- commons-lang3工具类-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- hippo4j-client的依赖-->
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter</artifactId>
<version>1.5.0</version>
</dependency>
<!-- OpenFeign调用缓存-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
</dependencies>

@ -1,8 +1,10 @@
package com.mashibing.smsgateway;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* @author zjw
@ -10,10 +12,11 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableDynamicThreadPool
@EnableFeignClients
public class SmsGatewayStarterApp {
public static void main(String[] args) {
SpringApplication.run(SmsGatewayStarterApp.class,args);
}
}

@ -0,0 +1,22 @@
package com.mashibing.smsgateway.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Set;
/**
* @author zjw
* @description
*/
@FeignClient(value = "beacon-cache")
public interface BeaconCacheClient {
@GetMapping("/cache/hget/{key}/{field}")
Integer hgetInteger(@PathVariable(value = "key") String key, @PathVariable(value = "field") String field);
@GetMapping("/cache/hget/{key}/{field}")
String hget(@PathVariable(value = "key")String key, @PathVariable(value = "field")String field);
}

@ -4,8 +4,6 @@ import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ~

@ -0,0 +1,44 @@
package com.mashibing.smsgateway.config;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author zjw
* @description
*/
@Configuration
public class ThreadPoolConfig {
@Bean
@DynamicThreadPool
public ThreadPoolExecutor cmppSubmitPool() {
String threadPoolId = "cmpp-submit";
ThreadPoolExecutor messageConsumeDynamicExecutor = ThreadPoolBuilder.builder()
// 指定线程名称的前缀
.threadFactory(threadPoolId)
// 线程池在Hippo4j中的唯一标识
.threadPoolId(threadPoolId)
// 代表动态线程池
.dynamicPool()
.build();
return messageConsumeDynamicExecutor;
}
@Bean
@DynamicThreadPool
public ThreadPoolExecutor cmppDeliverPool() {
String threadPoolId = "cmpp-deliver";
ThreadPoolExecutor messageProduceDynamicExecutor = ThreadPoolBuilder.builder()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
return messageProduceDynamicExecutor;
}
}

@ -0,0 +1,28 @@
package com.mashibing.smsgateway.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author zjw
* @description
*/
@RestController
public class TestController {
@Resource
private ThreadPoolExecutor cmppSubmitPool;
@GetMapping("/test")
public String test(){
cmppSubmitPool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
return "ok!";
}
}

@ -1,10 +1,16 @@
package com.mashibing.smsgateway.mq;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.NettyClient;
import com.mashibing.smsgateway.netty4.entity.CmppSubmit;
import com.mashibing.smsgateway.netty4.utils.Command;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@ -18,10 +24,26 @@ import java.io.IOException;
@Slf4j
public class SmsGatewayListener {
@RabbitListener(queues = "${gateway.sendtopic}",containerFactory = "gatewayContainerFactory")
@Autowired
private NettyClient nettyClient;
@RabbitListener(queues = "${gateway.sendtopic}")
public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException, InterruptedException {
log.info("【短信网关模块】 接收到消息 submit = {}",submit);
// =====================完成运营商交互,发送一次请求,接收两次响应==========================
//1、获取需要的核心属性
String srcNumber = submit.getSrcNumber();
String mobile = submit.getMobile();
String text = submit.getText();
// 这个序列是基于++实现的当取值达到MAX时会被重置这个值是可以重复利用的。
int sequence = MsgUtils.getSequence();
//2、声明发送短息时需要的CMPPSubmit对象
CmppSubmit cmppSubmit = new CmppSubmit(Command.CMPP2_VERSION,srcNumber,sequence,mobile,text);
//3、将submit对象做一个临时存储在运营商第一次响应时可以获取到。
// 如果怕出问题服务器宕机数据丢失可以上Redis~~~
CMPPSubmitRepoMapUtil.put(sequence,submit);
//4、和运营商交互发送短信
nettyClient.submit(cmppSubmit);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

@ -0,0 +1,70 @@
package com.mashibing.smsgateway.netty4;
import com.mashibing.smsgateway.netty4.entity.CmppActiveTestResp;
import com.mashibing.smsgateway.netty4.entity.CmppDeliver;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.netty4.utils.Command;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.commons.lang3.ArrayUtils;
import java.util.List;
/**
* Decoder
*/
public class CMPPDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
//字节数组
byte[] buf = new byte[byteBuf.readableBytes()];
//读取数据到字节数组
byteBuf.readBytes(buf);
//开始解析数据,先提取出长度字段标识长度的数据,也就是该条消息
//4位 消息长度
int totalLength = MsgUtils.bytesToInt(ArrayUtils.subarray(buf, 0, 4));
//获取到该长度的字节数组
byte[] bytes = ArrayUtils.subarray(buf, 0, totalLength);
//获取到响应类型,也就是哪个接口的响应,4位
int commandId = MsgUtils.bytesToInt(ArrayUtils.subarray(bytes, 4, 8));
//连接请求响应
switch (commandId) {
case Command.CMPP_ACTIVE_TEST:
System.out.println("-----------------接收到链路检测-----------------");
channelHandlerContext.writeAndFlush(new CmppActiveTestResp());
break;
case Command.CMPP_ACTIVE_TEST_RESP:
System.out.println("--------------接收到链路应答--------------");
break;
case Command.CMPP_DELIVER:
System.out.println("-------------状态报告---------------");
CmppDeliver deliver=new CmppDeliver(bytes);
list.add(deliver);
break;
case Command.CMPP_SUBMIT_RESP:
System.out.println("-------------接收到短信提交应答-------------");
CmppSubmitResp submitResp=new CmppSubmitResp(bytes);
list.add(submitResp);
break;
case Command.CMPP_QUERY_RESP:
System.out.println("-------------接收到短信查询应答-------------");
break;
case Command.CMPP_CONNECT_RESP:
System.out.println("-------------请求连接应答-------------");
//服务器端告诉客户端已接受你的连接
break;
default:
System.out.println("暂无解析"+commandId);
break;
}
//list.add(commandId);
}
}

@ -0,0 +1,36 @@
package com.mashibing.smsgateway.netty4;
import com.mashibing.smsgateway.netty4.entity.CmppMessageHeader;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
*
*/
public class CMPPEncoder extends MessageToByteEncoder<Object> {
public CMPPEncoder(){
super(false);
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (msg instanceof byte[]){
out.writeBytes((byte[])msg);
}else if(msg instanceof Integer){
out.writeInt((Integer)msg);
}else if(msg instanceof Byte){
out.writeByte((Byte)msg);
}else if(msg instanceof Long){
out.writeLong((Long)msg);
}else if(msg instanceof String){
out.writeBytes(((String)msg).getBytes("UTF-16BE"));
}else if (msg instanceof Character){
out.writeChar((Character)msg);
}else if (msg instanceof CmppMessageHeader){
CmppMessageHeader c=(CmppMessageHeader)msg;
out.writeBytes(c.toByteArray());
}
}
}

@ -0,0 +1,65 @@
package com.mashibing.smsgateway.netty4;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.CMPP2ResultUtil;
import com.mashibing.common.util.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.entity.CmppDeliver;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import com.mashibing.smsgateway.runnable.DeliverRunnable;
import com.mashibing.smsgateway.runnable.SubmitRepoRunnable;
import com.mashibing.smsgateway.util.SpringUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.ThreadPoolExecutor;
/**
* handler,
*/
@Slf4j
public class CMPPHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext context, Object msg) throws Exception {
if (msg instanceof CmppSubmitResp) {
CmppSubmitResp resp = (CmppSubmitResp) msg;
log.info("-------------接收到短信提交应答-------------");
log.info("----自增id" + resp.getSequenceId());
log.info("----状态:" + resp.getResult());
log.info("----第一次响应:" + resp.getMsgId());
// 将封装好的任务扔到线程池中,执行即可
ThreadPoolExecutor cmppSubmitPool = (ThreadPoolExecutor) SpringUtil.getBeanByName("cmppSubmitPool");
cmppSubmitPool.execute(new SubmitRepoRunnable(resp));
}
if (msg instanceof CmppDeliver) {
CmppDeliver resp = (CmppDeliver) msg;
// 是否为状态报告 0非状态报告1状态报告
if (resp.getRegistered_Delivery() == 1) {
// 如果是状态报告的话
log.info("-------------状态报告---------------");
log.info("----第二次响应:" + resp.getMsg_Id_DELIVRD());
log.info("----手机号:" + resp.getDest_terminal_Id());
log.info("----状态:" + resp.getStat());
// 提前获取一手~~~
ThreadPoolExecutor cmppDeliverPool = (ThreadPoolExecutor) SpringUtil.getBeanByName("cmppDeliverPool");
cmppDeliverPool.execute(new DeliverRunnable(resp.getMsg_Id_DELIVRD(),resp.getStat()));
} else {
//用户回复会打印在这里
log.info("" + MsgUtils.bytesToLong(resp.getMsg_Id()));
log.info(resp.getSrc_terminal_Id());
log.info(resp.getMsg_Content());
}
}
}
}

@ -0,0 +1,45 @@
package com.mashibing.smsgateway.netty4;
import com.mashibing.smsgateway.netty4.entity.CmppActiveTest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* Handler
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
private NettyClient client;
public HeartHandler(NettyClient client){
this.client=client;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("初始化创建连接。。。");
super.channelActive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state = event.state();
if (state == IdleState.WRITER_IDLE || state == IdleState.ALL_IDLE) {
client.submit(new CmppActiveTest());
System.out.println("心跳启动!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
client.reConnect(10);
}
}

@ -0,0 +1,136 @@
package com.mashibing.smsgateway.netty4;
import com.mashibing.smsgateway.netty4.entity.CmppConnect;
import com.mashibing.smsgateway.netty4.entity.CmppMessageHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* netty
*/
public class NettyClient {
// 通道
private Channel channel;
// IP
private String host;
// 端口
private int port;
// 用户名
private String serviceId;
// 密码
private String pwd;
// 构建NettyClient
public NettyClient(String host, int port, String serviceId, String pwd) {
this.host = host;
this.port = port;
this.serviceId = serviceId;
this.pwd = pwd;
}
// 开启连接
public void start() {
try {
doConnect(host, port);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 在保持连接的状态下提交信息
public boolean submit(CmppMessageHeader submit) {
if (isActive()) {
channel.writeAndFlush(submit);
return true;
}
return false;
}
// 是否保持连接
public boolean isActive() {
if (channel == null) {
return false;
}
if (!channel.isOpen() || !channel.isActive() || !channel.isWritable()) {
//channel没开 或 没激活
return false;
}
return true;
}
/**
* channelInactive
*
* @param reConnect
*/
public void reConnect(int reConnect) {
int times = 0;
while (true && times < reConnect) {
try {
if (!isActive()) {
start();
} else {
try {
Thread.sleep(10 * 1000);
times++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception ex) {
System.out.println("尝试重连...:" + host + ":" + port + " / " + serviceId);
try {
Thread.sleep(10 * 1000);
times++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
*
* @param host
* @param port
* @throws InterruptedException
*/
public void doConnect(final String host, final int port) throws InterruptedException {
//创建线程组 - 手动设置线程数,默认为cpu核心数2倍
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
//创建引导程序
Bootstrap bootstrap = new Bootstrap();
//保持长连接
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
//将线程加入bootstrap
bootstrap.group(eventLoopGroup)
.remoteAddress(host, port)
//使用指定通道类
.channel(NioSocketChannel.class)
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
//重写通道初始化方法
.handler(new NettyClientInitializer(this));
ChannelFuture channelFuture = bootstrap.connect().sync();
channel = channelFuture.channel();
//账号登陆
CmppMessageHeader cmppConnect = new CmppConnect(serviceId, pwd);
channel.writeAndFlush(cmppConnect);
channelFuture.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
//关闭前阻塞
// channelFuture.channel().closeFuture().sync();
}
}

@ -0,0 +1,40 @@
package com.mashibing.smsgateway.netty4;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* Netty
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
private NettyClient client;
public NettyClientInitializer(NettyClient client) {
this.client = client;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline ph = ch.pipeline();
//长度编码器,防止粘包拆包
ph.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 0, true));
//心跳
//readerIdleTime:为读超时间(即测试端一定时间内未接收到被测试端消息)
//writerIdleTime:为写超时间(即测试端一定时间内向被测试端发送消息);
//allIdeTime所有类型的超时时间
ph.addLast("idleState handler", new IdleStateHandler(0, 0, 20, TimeUnit.SECONDS));
//心跳包
ph.addLast("heart handler", new HeartHandler(client));
//解析
ph.addLast("encoder", new CMPPEncoder());
ph.addLast("decoder", new CMPPDecoder());
//客户端的逻辑
ph.addLast("cmpp handler", new CMPPHandler());
}
}

@ -0,0 +1,26 @@
package com.mashibing.smsgateway.netty4;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyStartCMPP {
//ip
public static String host = "127.0.0.1";
//端口
public static int port = 7890;
//账号
public static String serviceId = "laozheng";
//密码
public static String pwd = "JavaLaoZheng123!";
@Bean(initMethod = "start")
public NettyClient nettyClient() {
NettyClient cmppClient = new NettyClient(host, port, serviceId, pwd);
return cmppClient;
}
}

@ -0,0 +1,26 @@
package com.mashibing.smsgateway.netty4.entity;
import com.mashibing.smsgateway.netty4.utils.Command;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class CmppActiveTest extends CmppMessageHeader {
public CmppActiveTest() {
super(Command.CMPP_ACTIVE_TEST, Command.CMPP2_VERSION);
}
/**
*
*
* @return
*/
@Override
public byte[] toByteArray() {
ByteBuf buf = Unpooled.buffer(12);
buf.writeInt(12);
buf.writeInt(Command.CMPP_ACTIVE_TEST);
buf.writeInt(0);
return buf.array();
}
}

@ -0,0 +1,27 @@
package com.mashibing.smsgateway.netty4.entity;
import com.mashibing.smsgateway.netty4.utils.Command;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class CmppActiveTestResp extends CmppMessageHeader {
public CmppActiveTestResp() {
super(Command.CMPP_ACTIVE_TEST_RESP, Command.CMPP2_VERSION);
}
/**
*
*
* @return
*/
@Override
public byte[] toByteArray() {
ByteBuf buf = Unpooled.buffer(4 + 4 + 4 + 1);
buf.writeInt(4 + 4 + 4 + 1);
buf.writeInt(Command.CMPP_ACTIVE_TEST_RESP);
buf.writeInt(0);
buf.writeByte(0);
return buf.array();
}
}

@ -0,0 +1,44 @@
package com.mashibing.smsgateway.netty4.entity;
import com.mashibing.smsgateway.netty4.utils.Command;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* CMPP
*/
public class CmppConnect extends CmppMessageHeader {
private String serviceId;
private String pwd;
public CmppConnect(String serviceId, String pwd) {
super(Command.CMPP_CONNECT, Command.CMPP2_VERSION);
this.serviceId = serviceId;
this.pwd = pwd;
}
@Override
public byte[] toByteArray() {
ByteBuf buf = Unpooled.buffer(4 + 4 + 4 + 6 + 16 + 1 + 4);
//Total_Length
buf.writeInt(4 + 4 + 4 + 6 + 16 + 1 + 4);
//Command_Id
buf.writeInt(Command.CMPP_CONNECT);
//Sequence_Id
buf.writeInt(MsgUtils.getSequence());
//sourceAddr
buf.writeBytes(MsgUtils.getLenBytes(serviceId, 6));
//authenticatorSource
buf.writeBytes(MsgUtils.getAuthenticatorSource(serviceId, pwd));
//version
buf.writeByte(1);
//timestamp
buf.writeInt(Integer.parseInt(MsgUtils.getTimestamp()));
return buf.array();
}
}

@ -0,0 +1,368 @@
package com.mashibing.smsgateway.netty4.entity;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
public class CmppDeliver {
private byte[] Msg_Id = new byte[8];
private String Dest_Id;// 21 目的号码 String
private String Service_Id;// 10 业务标识 String
private byte TP_pid = 0;
private byte TP_udhi = 0;
private byte Msg_Fmt = 15;
private String Src_terminal_Id;// 21 string 源终端MSISDN号码
private byte Registered_Delivery = 0;// 是否为状态报告 0非状态报告1状态报告
private byte msg_Length;// 消息长度
private String msg_Content;// 消息内容
private long Reserved; // 保留字段
/**
* ISMGSPMsg_Content
*/
private long Msg_Id_DELIVRD;
private String Stat;
private String Submit_time;
private String Done_time;
private String Dest_terminal_Id;
private int SMSC_sequence;
private int result;// 解析结果
/**
* 2016930
*
* @throws IOException
*/
public CmppDeliver(byte[] data) {
System.arraycopy(data, 12, data, 0, data.length - 12);
ByteArrayInputStream bais = null;
DataInputStream dis = null;
try {
if (null != data && data.length > 0) {
bais = new ByteArrayInputStream(data);
dis = new DataInputStream(bais);
/**
* migid
*/
byte[] msgid_b = new byte[8];
dis.read(msgid_b);
this.Msg_Id = msgid_b;
this.Dest_Id = MsgUtils.readString(dis, 21, null);// 21bit
this.Service_Id = MsgUtils.readString(dis, 10, null);
this.TP_pid = dis.readByte();
this.TP_udhi = dis.readByte();
this.Msg_Fmt = dis.readByte();
// 源终端MSISDN号码状态报告时填为CMPP_SUBMIT消息的目的终端号码
this.Src_terminal_Id = MsgUtils.readString(dis, 21, this.Msg_Fmt == 8 ? "UTF-16BE" : "gb2312");
// 是否为应答信息 0非应答信息 1状态报告
this.Registered_Delivery = dis.readByte();
this.msg_Length = dis.readByte();
// 状态报告的 msg_Content_b 类型大小为协议固定8+7+10+10+21+4
byte[] msg_Content_b = new byte[this.Registered_Delivery == 0 ? this.msg_Length
: 8 + 7 + 10 + 10 + 21 + 4];
dis.read(msg_Content_b);
// 如果是状态报告
if (this.Registered_Delivery == 1) {
ByteArrayInputStream baisd = new ByteArrayInputStream(msg_Content_b);
DataInputStream disd = new DataInputStream(baisd);
// 开始解析 content中的字段
byte[] Msg_Id_DELIVRD_B = new byte[8];
disd.read(Msg_Id_DELIVRD_B);
this.Msg_Id_DELIVRD = MsgUtils.bytesToLong(Msg_Id_DELIVRD_B);
this.Msg_Id_DELIVRD = Math.abs(this.Msg_Id_DELIVRD);
this.Stat = MsgUtils.readString(disd, 7, this.Msg_Fmt == 8 ? "UTF-16BE" : "gb2312");
this.Submit_time = MsgUtils.readString(disd, 10, this.Msg_Fmt == 8 ? "UTF-16BE" : "gb2312");
this.Done_time = MsgUtils.readString(disd, 10, this.Msg_Fmt == 8 ? "UTF-16BE" : "gb2312");
this.Dest_terminal_Id = MsgUtils.readString(disd, 21, this.Msg_Fmt == 8 ? "UTF-16BE" : "gb2312");
this.SMSC_sequence = disd.readInt();
disd.close();
baisd.close();
this.result = 0;
} else {
this.msg_Content = new String(msg_Content_b, this.Msg_Fmt == 8 ? "UTF-16BE" : "gb2312");
this.Reserved = dis.readLong();// 保留项 暂无用
}
this.result = 0;
} else {
this.result = 1;
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (null != dis)
dis.close();
if (null != bais)
bais.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @return msg_Id
*/
public byte[] getMsg_Id() {
return Msg_Id;
}
/**
* @return dest_Id
*/
public String getDest_Id() {
return Dest_Id;
}
/**
* @return service_Id
*/
public String getService_Id() {
return Service_Id;
}
/**
* @return tP_pid
*/
public byte getTP_pid() {
return TP_pid;
}
/**
* @return tP_udhi
*/
public byte getTP_udhi() {
return TP_udhi;
}
/**
* @return msg_Fmt
*/
public byte getMsg_Fmt() {
return Msg_Fmt;
}
/**
* @return src_terminal_Id
*/
public String getSrc_terminal_Id() {
return Src_terminal_Id;
}
/**
* @return registered_Delivery
*/
public byte getRegistered_Delivery() {
return Registered_Delivery;
}
/**
* @return msg_Length
*/
public byte getMsg_Length() {
return msg_Length;
}
/**
* @return msg_Content
*/
public String getMsg_Content() {
return msg_Content;
}
/**
* @return reserved
*/
public long getReserved() {
return Reserved;
}
/**
* @return msg_Id_DELIVRD
*/
public long getMsg_Id_DELIVRD() {
return Msg_Id_DELIVRD;
}
/**
* @return stat
*/
public String getStat() {
return Stat;
}
/**
* @return submit_time
*/
public String getSubmit_time() {
return Submit_time;
}
/**
* @return done_time
*/
public String getDone_time() {
return Done_time;
}
/**
* @return dest_terminal_Id
*/
public String getDest_terminal_Id() {
return Dest_terminal_Id;
}
/**
* @return sMSC_sequence
*/
public int getSMSC_sequence() {
return SMSC_sequence;
}
/**
* @return result
*/
public int getResult() {
return result;
}
/**
* @param msg_Id msg_Id
*/
public void setMsg_Id(byte[] msg_Id) {
Msg_Id = msg_Id;
}
/**
* @param dest_Id dest_Id
*/
public void setDest_Id(String dest_Id) {
Dest_Id = dest_Id;
}
/**
* @param service_Id service_Id
*/
public void setService_Id(String service_Id) {
Service_Id = service_Id;
}
/**
* @param tP_pid tP_pid
*/
public void setTP_pid(byte tP_pid) {
TP_pid = tP_pid;
}
/**
* @param tP_udhi tP_udhi
*/
public void setTP_udhi(byte tP_udhi) {
TP_udhi = tP_udhi;
}
/**
* @param msg_Fmt msg_Fmt
*/
public void setMsg_Fmt(byte msg_Fmt) {
Msg_Fmt = msg_Fmt;
}
/**
* @param src_terminal_Id src_terminal_Id
*/
public void setSrc_terminal_Id(String src_terminal_Id) {
Src_terminal_Id = src_terminal_Id;
}
/**
* @param registered_Delivery registered_Delivery
*/
public void setRegistered_Delivery(byte registered_Delivery) {
Registered_Delivery = registered_Delivery;
}
/**
* @param msg_Length msg_Length
*/
public void setMsg_Length(byte msg_Length) {
this.msg_Length = msg_Length;
}
/**
* @param msg_Content msg_Content
*/
public void setMsg_Content(String msg_Content) {
this.msg_Content = msg_Content;
}
/**
* @param reserved reserved
*/
public void setReserved(long reserved) {
Reserved = reserved;
}
/**
* @param msg_Id_DELIVRD msg_Id_DELIVRD
*/
public void setMsg_Id_DELIVRD(int msg_Id_DELIVRD) {
Msg_Id_DELIVRD = msg_Id_DELIVRD;
}
/**
* @param stat stat
*/
public void setStat(String stat) {
Stat = stat;
}
/**
* @param submit_time submit_time
*/
public void setSubmit_time(String submit_time) {
Submit_time = submit_time;
}
/**
* @param done_time done_time
*/
public void setDone_time(String done_time) {
Done_time = done_time;
}
/**
* @param dest_terminal_Id dest_terminal_Id
*/
public void setDest_terminal_Id(String dest_terminal_Id) {
Dest_terminal_Id = dest_terminal_Id;
}
/**
* @param sMSC_sequence sMSC_sequence
*/
public void setSMSC_sequence(int sMSC_sequence) {
SMSC_sequence = sMSC_sequence;
}
/**
* @param result result
*/
public void setResult(int result) {
this.result = result;
}
}

@ -0,0 +1,43 @@
package com.mashibing.smsgateway.netty4.entity;
import java.io.Serializable;
public abstract class CmppMessageHeader implements Serializable {
/**
* ()
*/
protected int totalLength;
/**
*
*/
protected int commandId;
/**
* 1使
*/
protected long sequenceId;
/**
* CMPP 2.0 3.0
*/
protected byte version;
protected CmppMessageHeader(int commandId, byte version) {
this.commandId = commandId;
this.version = version;
}
/**
*
*
* @return
*/
public abstract byte[] toByteArray();
}

@ -0,0 +1,156 @@
package com.mashibing.smsgateway.netty4.entity;
import com.mashibing.smsgateway.netty4.utils.Command;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.UnsupportedEncodingException;
/**
* CMPP
*/
/**
* CMPP
*/
public class CmppSubmit extends CmppMessageHeader {
int msgId = 0;
byte pkTotal = 0;
byte pkNumber = 0;
byte registeredDelivery = 1;
byte msgLevel = 0;
String serviceId;// 10位长度
byte feeUserType = 3;
String feeTerminalId;// 21位长度
byte feeTerminalType = 0;
byte tp_pid = 0;
byte tp_udhi = 0;
/**
* 0ASCII
* 3
* 4
* 8UCS2
* 15GB
*/
byte msgFmt = 8;
String msgSrc;// 6位长度
/**
* 02
* 03
* 04
* 05SP
*/
String feeType;// 2位长度
String feeCode;// 6位长度
String vaildTime = "";// 17位长度
String atTime = "";// 17位长度
String srcId;// 21位长度
byte destUsrTl = 1;
String destTerminalId;// 21位长度
byte destTerminalType = 0;
byte msgLength; // 1位长度
byte[] msgContent;
String linkId = "";// 保留字
private int terminalIdLen;
private int linkIdLen;
private int submitExpMsgLen;
/**
*
* @param version CMPP2.0
* @param srcId
* @param SequenceId
* @param mobile
* @param content
*/
public CmppSubmit(byte version, String srcId, int SequenceId, String mobile, String content) {
super(Command.CMPP_SUBMIT, version);
if (version == Command.CMPP2_VERSION) {
terminalIdLen = 21;
linkIdLen = 8;
submitExpMsgLen = Command.CMPP2_SUBMIT_LEN_EXPMSGLEN;
} else {
terminalIdLen = 32;
linkIdLen = 20;
submitExpMsgLen = Command.CMPP3_SUBMIT_LEN_EXPMSGLEN;
}
this.feeTerminalId = "";
this.feeType = "02";
this.feeCode = "000000";
this.srcId = srcId + "1630";
this.msgFmt = (byte) 8;
this.linkId = "";
try {
msgContent = content.getBytes("UTF-16BE");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
this.msgId = SequenceId;
this.sequenceId = this.msgId;
this.destTerminalId = mobile;
}
@Override
public byte[] toByteArray() {
ByteBuf buf = Unpooled.buffer(130 + 8 + ((byte) 1) * 21 + msgContent.length);
buf.writeInt(130 + 8 + ((byte) 1) * 21 + msgContent.length);
buf.writeInt(commandId);
buf.writeInt(msgId);
buf.writeLong(0);
buf.writeByte(0);
buf.writeByte(0);
buf.writeByte(1);
buf.writeByte(0);
buf.writeBytes(MsgUtils.getLenBytes(serviceId, 10));
buf.writeByte((byte) 2);
buf.writeBytes(MsgUtils.getLenBytes("", 21));
buf.writeByte(0);
buf.writeByte(0);
buf.writeByte(8);
buf.writeBytes(MsgUtils.getLenBytes(serviceId, 6));
buf.writeBytes(MsgUtils.getLenBytes(feeType, 2));
buf.writeBytes(MsgUtils.getLenBytes(feeCode, 6));
buf.writeBytes(MsgUtils.getLenBytes(vaildTime, 17));
buf.writeBytes(MsgUtils.getLenBytes(atTime, 17));
buf.writeBytes(MsgUtils.getLenBytes(srcId, 21));
buf.writeByte((byte) 1);
buf.writeBytes(MsgUtils.getLenBytes(destTerminalId, 21));
buf.writeByte(msgContent.length);
buf.writeBytes(msgContent);
buf.writeLong((long) 0);
return buf.array();
}
}

@ -0,0 +1,38 @@
package com.mashibing.smsgateway.netty4.entity;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import org.apache.commons.lang3.ArrayUtils;
/**
* CMPP
*/
public class CmppSubmitResp {
private int sequenceId;
private int result;
private long msgId;
public CmppSubmitResp(byte[] bytes) {
this.sequenceId = MsgUtils.bytesToInt(ArrayUtils.subarray(bytes, 8, 12));
this.msgId = MsgUtils.bytesToLong(ArrayUtils.subarray(bytes, 12, 20));
this.msgId = Math.abs(this.msgId);
this.result = bytes[20];
}
public int getResult() {
return result;
}
public long getMsgId() {
return msgId;
}
public int getSequenceId() {
return sequenceId;
}
}

@ -0,0 +1,150 @@
package com.mashibing.smsgateway.netty4.utils;
/**
* CMPP2.0
*/
public interface Command {
/**
*
*/
public final int CMPP_CONNECT = 0x00000001;
/**
*
*/
public final int CMPP_CONNECT_RESP = 0x80000001;
/**
*
*/
public final int CMPP_TERMINATE = 0x00000002;
/**
*
*/
public final int CMPP_TERMINATE_RESP = 0x80000002;
/**
*
*/
public final int CMPP_SUBMIT = 0x00000004;
/**
*
*/
public final int CMPP_SUBMIT_RESP = 0x80000004;
/**
*
*/
public final int CMPP_DELIVER = 0x00000005;
/**
* +
*/
public final int CMPP_DELIVER_RESP = 0x80000005;
/**
*
*/
public final int CMPP_QUERY = 0x00000006;
/**
*
*/
public final int CMPP_QUERY_RESP = 0x80000006;
/**
*
*/
public final int CMPP_CANCEL = 0x00000007;
/**
*
*/
public final int CMPP_CANCEL_RESP = 0x80000007;
/**
*
*/
public final int CMPP_ACTIVE_TEST = 0x00000008;
/**
*
*/
public final int CMPP_ACTIVE_TEST_RESP = 0x80000008;
/**
*
*/
public final int CMPP_FWD = 0x00000009;
/**
*
*/
public final int CMPP_FWD_RESP = 0x80000009;
/**
* MT
*/
public final int CMPP_MT_ROUTE = 0x00000010;
/**
* MT
*/
public final int CMPP_MT_ROUTE_RESP = 0x80000010;
/**
* MO
*/
public final int CMPP_MO_ROUTE = 0x00000011;
/**
* MO
*/
public final int CMPP_MO_ROUTE_RESP = 0x80000011;
/**
* MT
*/
public final int CMPP_GET_MT_ROUTE = 0x00000012;
/**
* MT
*/
public final int CMPP_GET_MT_ROUTE_RESP = 0x80000012;
/**
* MT
*/
public final int CMPP_MT_ROUTE_UPDATE = 0x00000013;
/**
* MT
*/
public final int CMPP_MT_ROUTE_UPDATE_RESP = 0x80000013;
/**
* MO
*/
public final int CMPP_MO_ROUTE_UPDATE = 0x00000014;
/**
* MO
*/
public final int CMPP_MO_ROUTE_UPDATE_RESP = 0x80000014;
/**
* MT
*/
public final int CMPP_PUSH_MT_ROUTE_UPDATE = 0x00000015;
/**
* MT
*/
public final int CMPP_PUSH_MT_ROUTE_UPDATE_RESP = 0x80000015;
/**
* MO
*/
public final int CMPP_PUSH_MO_ROUTE_UPDATE = 0x00000016;
/**
* MO
*/
public final int CMPP_PUSH_MO_ROUTE_UPDATE_RESP = 0x80000016;
/**
* MO
*/
public final int CMPP_GET_MO_ROUTE = 0x00000017;
/**
* MO
*/
public final int CMPP_GET_MO_ROUTE_RESP = 0x80000017;
/** Cmpp协议版本字节常量 */
public static final byte CMPP2_VERSION = (byte) 32;
/** Cmpp协议版本字节常量 */
public static final byte CMPP3_VERSION = (byte) 48;
/** 下行长度(不包含短信内容长度) */
public static final int CMPP2_SUBMIT_LEN_EXPMSGLEN = 8 + 1 + 1 + 1 + 1 + 10 + 1 + 21 + 1 + 1 + 1
+ 6 + 2 + 6 + 17 + 17 + 21 + 1 + 1 + 8;
public static final int CMPP3_SUBMIT_LEN_EXPMSGLEN = 8 + 1 + 1 + 1 + 1 + 10 + 1 + 32 + 1 + 1
+ 1 + 1 + 6 + 2 + 6 + 17 + 17 + 21 + 1 + 1 + 1 + 20;
}

@ -0,0 +1,216 @@
package com.mashibing.smsgateway.netty4.utils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
*
*/
public class MsgUtils {
//序列编号起始值(起始为随机数即可)
private static int sequenceId = Math.abs(new Long(System.currentTimeMillis()).intValue());
//序列峰值
private static int MAX_VALUE = Integer.MAX_VALUE / 2;
/**
*
*/
public static synchronized int getSequence() {
++sequenceId;
if (sequenceId > MAX_VALUE) {
sequenceId = Math.abs(new Long(System.currentTimeMillis()).intValue());
}
return sequenceId;
}
/**
* ,,MMDDHHMMSS10 <EFBFBD>?
*/
public static String getTimestamp() {
DateFormat format = new SimpleDateFormat("MMddhhmmss");
return format.format(new Date());
}
/**
* <EFBFBD>?<EFBFBD>?<EFBFBD>MD5 hash
* AuthenticatorSource =
* MD5Source_Addr+9 <EFBFBD>?0 +shared secret+timestamp<EFBFBD>?
* Shared secret <EFBFBD>?timestampMMDDHHMMSS10<EFBFBD>??
*
* @return
*/
public static byte[] getAuthenticatorSource(String spId, String secret) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] data = (spId + "\0\0\0\0\0\0\0\0\0" + secret + MsgUtils.getTimestamp()).getBytes();
return md5.digest(data);
} catch (NoSuchAlgorithmException e) {
System.out.println("SP链接到ISMG拼接AuthenticatorSource失败" + e.getMessage());
return null;
}
}
/**
* 0
*
* @param dous:<EFBFBD>?
* @param s:<EFBFBD>?
* @param len:,<EFBFBD>?0
*/
public static void writeString(DataOutputStream dous, String s, int len) {
try {
byte[] data = s.getBytes("gb2312");
if (data.length > len) {
System.out.println("向流中写入的字符串超长!要写" + len + " 字符串是:" + s);
}
int srcLen = data.length;
dous.write(data);
while (srcLen < len) {
dous.write('\0');
srcLen++;
}
} catch (IOException e) {
System.out.println("向流中写入指定字节长度的字符串失败:" + e.getMessage());
}
}
public static byte[] getLenBytes(String s, int len) {
if (s == null) {
s = "";
}
byte[] rb = new byte[len];
byte[] sb = s.getBytes();
for (int i = sb.length; i < rb.length; i++) {
rb[i] = 0;
}
if (sb.length == len) {
return sb;
} else {
for (int i = 0; i < sb.length && i < len; i++) {
rb[i] = sb[i];
}
return rb;
}
}
/**
*
*
* @param ins:<EFBFBD>?
* @param len:<EFBFBD>?
* @return:<EFBFBD>?
*/
public static String readString(java.io.DataInputStream ins, int len, String charset) {
byte[] b = new byte[len];
try {
ins.read(b);
String s;
if (charset == null)
s = new String(b);
else
s = new String(b, charset);
s = s.trim();
return s;
} catch (IOException e) {
return "";
}
}
/**
*
*
* @param msg
* @param start
* @param end
* @return
*/
public static byte[] getMsgBytes(byte[] msg, int start, int end) {
byte[] msgByte = new byte[end - start];
int j = 0;
for (int i = start; i < end; i++) {
msgByte[j] = msg[i];
j++;
}
return msgByte;
}
/**
* UCS2
*
* @param src UCS2
* @return UTF-16BE<EFBFBD>?
*/
public static String DecodeUCS2(String src) {
byte[] bytes = new byte[src.length() / 2];
for (int i = 0; i < src.length(); i += 2) {
bytes[i / 2] = (byte) (Integer.parseInt(src.substring(i, i + 2), 16));
}
String reValue = "";
try {
reValue = new String(bytes, "UTF-16BE");
} catch (UnsupportedEncodingException e) {
reValue = "";
}
return reValue;
}
/**
* byte[] long
* 2016930
*/
public static long bytesToLong(byte[] b) {
long temp = 0;
long res = 0;
for (int i = 0; i < 8; i++) {
temp = b[i] & 0xff;
temp <<= 8 * i;
res |= temp;
}
return res;
}
public static int bytesToInt(byte[] b) {
return b[3] & 0xFF |
(b[2] & 0xFF) << 8 |
(b[1] & 0xFF) << 16 |
(b[0] & 0xFF) << 24;
}
/**
* UCS2
*
* @param src UTF-16BE<EFBFBD>?
* @return UCS2<EFBFBD>?
*/
public static String EncodeUCS2(String src) {
byte[] bytes;
try {
bytes = src.getBytes("UTF-16BE");
} catch (UnsupportedEncodingException e) {
bytes = new byte[0];
}
StringBuffer reValue = new StringBuffer();
StringBuffer tem = new StringBuffer();
for (int i = 0; i < bytes.length; i++) {
tem.delete(0, tem.length());
tem.append(Integer.toHexString(bytes[i] & 0xFF));
if (tem.length() == 1) {
tem.insert(0, '0');
}
reValue.append(tem);
}
return reValue.toString().toUpperCase();
}
}

@ -0,0 +1,74 @@
package com.mashibing.smsgateway.runnable;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.CMPP2DeliverUtil;
import com.mashibing.common.util.CMPP2ResultUtil;
import com.mashibing.common.util.CMPPDeliverMapUtil;
import com.mashibing.common.util.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.client.BeaconCacheClient;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.util.SpringUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
/**
* @author zjw
* @description
*/
public class DeliverRunnable implements Runnable {
private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class);
private BeaconCacheClient cacheClient = SpringUtil.getBeanByClass(BeaconCacheClient.class);
private final String DELIVRD = "DELIVRD";
private long msgId;
private String stat;
public DeliverRunnable(long msgId, String stat) {
this.msgId = msgId;
this.stat = stat;
}
@Override
public void run() {
//1、基于msgId拿到临时存储的Report对象
StandardReport report = CMPPDeliverMapUtil.remove(msgId + "");
//2、确认当前短信发送的最终状态
if(!StringUtils.isEmpty(stat) && stat.equals(DELIVRD)){
// 短信发送成功
report.setReportState(SmsConstant.REPORT_SUCCESS);
}else{
// 短信发送失败
report.setReportState(SmsConstant.REPORT_FAIL);
report.setErrorMsg(CMPP2DeliverUtil.getResultMessage(stat));
}
//3、客户状态报告推送让网关模块查询缓存当前客户是否需要状态报告推送
// 查询当前客户的isCallback
Integer isCallback = cacheClient.hgetInteger(CacheConstant.CLIENT_BUSINESS + report.getApikey(), "isCallback");
if(isCallback == 1){
// 如果需要回调,再查询客户的回调地址
String callbackUrl = cacheClient.hget(CacheConstant.CLIENT_BUSINESS + report.getApikey(), "callbackUrl");
// 如果回调地址不为空。
if(!StringUtils.isEmpty(callbackUrl)){
// 封装客户的报告推送的信息开始封装StandardReport
report.setIsCallback(isCallback);
report.setCallbackUrl(callbackUrl);
// 发送消息到RabbitMQ
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_PUSH_REPORT,report);
}
}
//4、发送消息让搜索模块对之前写入的信息做修改这里需要做一个死信队列延迟10s发送修改es信息的消息
}
}

@ -0,0 +1,55 @@
package com.mashibing.smsgateway.runnable;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.CMPP2ResultUtil;
import com.mashibing.common.util.CMPPDeliverMapUtil;
import com.mashibing.common.util.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.util.SpringUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
/**
* @author zjw
* @description
*/
public class SubmitRepoRunnable implements Runnable {
private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class);
private CmppSubmitResp submitResp;
private final int OK = 0;
public SubmitRepoRunnable(CmppSubmitResp submitResp) {
this.submitResp = submitResp;
}
@Override
public void run() {
StandardReport report = null;
//1、拿到自增ID并且从ConcurrentHashMap中获取到存储的submit
StandardSubmit submit = CMPPSubmitRepoMapUtil.remove(submitResp.getSequenceId());
//2、根据运营商返回的result确认短信状态并且封装submit
int result = submitResp.getResult();
if (result != OK) {
// 到这,说明运营商的提交应答中回馈的失败的情况
String resultMessage = CMPP2ResultUtil.getResultMessage(result);
submit.setReportState(SmsConstant.REPORT_FAIL);
submit.setErrorMsg(resultMessage);
} else {
// 如果没进到if中说明运营商已经正常的接收了发送短信的任务这边完成3操作
//3、将submit封装为Report临时存储以便运营商返回状态码时可以再次获取到信息
// 这里没有对其他信息做封装
report = new StandardReport();
BeanUtils.copyProperties(submit, report);
CMPPDeliverMapUtil.put(submitResp.getMsgId() + "",report);
}
//4、将封装好的submit直接扔RabbitMQ中让搜索模块记录信息
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_WRITE_LOG,submit);
}
}

@ -0,0 +1,29 @@
package com.mashibing.smsgateway.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author zjw
* @description
*/
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtil.applicationContext = applicationContext;
}
public static Object getBeanByName(String beanName){
return SpringUtil.applicationContext.getBean(beanName);
}
public static <T> T getBeanByClass(Class<T> clazz){
return SpringUtil.applicationContext.getBean(clazz);
}
}

@ -14,3 +14,4 @@ spring:
config:
server-addr: 114.116.226.76:8848
file-extension: yml

@ -1,21 +1,12 @@
package com.mashibing.strategy.filter.impl;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.DFAUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
/**
*

@ -1,26 +1,18 @@
package com.mashibing.strategy.filter.impl;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.StrategyException;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.DFAUtil;
import com.mashibing.strategy.util.ErrorSendMsgUtil;
import com.mashibing.strategy.util.HutoolDFAUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
/**
*

@ -9,7 +9,6 @@ import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.ErrorSendMsgUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.weaver.tools.cache.CacheStatistics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@ -10,7 +10,6 @@ import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rx.BackpressureOverflow;
import java.io.IOException;

@ -3,7 +3,6 @@ package com.mashibing.strategy.util;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
@ -13,8 +12,6 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author zjw
* @description

Loading…
Cancel
Save