路由策略,完成整体的通道选择和消息发送~

master
Administrator 3 years ago
parent b51e28ec74
commit a2bbb59feb

@ -58,5 +58,14 @@ public interface CacheConstant {
*/
String LIMIT_HOURS = "limit:hours:";
/**
*
*/
String CLIENT_CHANNEL = "client_channel:";
/**
*
*/
String CHANNEL = "channel:";
}

@ -28,5 +28,10 @@ public interface RabbitMQConstants {
*/
String SMS_PUSH_REPORT = "sms_push_report_topic";
/**
*
*/
String SMS_GATEWAY = "sms_gateway_topic_";
}

@ -8,7 +8,7 @@ import lombok.Getter;
*/
@Getter
public enum ExceptionEnums {
UNKNOWN_ERROR(-999,"未知错误!"),
ERROR_APIKEY(-1,"非法的apikey"),
IP_NOT_WHITE(-2,"请求的ip不在白名单内"),
ERROR_SIGN(-3,"无可用签名"),
@ -23,6 +23,7 @@ public enum ExceptionEnums {
BLACK_CLIENT(-15,"当前手机号为客户黑名单!"),
ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效无法发送短信"),
ONE_HOUR_LIMIT(-17,"1小时限流规则生效无法发送短信"),
NO_CHANNEL(-18,"没有选择到合适的通道!"),
;
private Integer code;

@ -132,6 +132,7 @@ public class StandardSubmit implements Serializable {
*/
private Long oneHourLimitMilli;
// 后续再做封装~~~~

@ -3,6 +3,7 @@ package com.mashibing.strategy.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Set;
/**
@ -27,6 +28,9 @@ public interface BeaconCacheClient {
@GetMapping("/cache/smember/{key}")
Set<String> smember(@PathVariable(value = "key")String key);
@GetMapping("/cache/smember/{key}")
Set<Map> smemberMap(@PathVariable(value = "key")String key);
@PostMapping(value = "/cache/zadd/{key}/{score}/{member}")
Boolean zadd(@PathVariable(value = "key")String key,
@PathVariable(value = "score")Long score,
@ -44,4 +48,7 @@ public interface BeaconCacheClient {
Long hIncrBy(@PathVariable(value = "key") String key,
@PathVariable(value = "field") String field,
@PathVariable(value = "delta") Long delta);
@GetMapping("/cache/hgetall/{key}")
Map hGetAll(@PathVariable(value = "key")String key);
}

@ -1,10 +1,27 @@
package com.mashibing.strategy.filter.impl;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.StrategyException;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.ChannelTransferUtil;
import com.mashibing.strategy.util.ErrorSendMsgUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*
* @author zjw
@ -13,8 +30,100 @@ import org.springframework.stereotype.Service;
@Service(value = "route")
@Slf4j
public class RouteStrategyFilter implements StrategyFilter {
@Autowired
private BeaconCacheClient cacheClient;
@Autowired
private ErrorSendMsgUtil sendMsgUtil;
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void strategy(StandardSubmit submit) {
log.info("【策略模块-路由策略】 校验ing…………");
//1、拿到客户id
Long clientId = submit.getClientId();
//2、基于redis获取当前客户绑定的所有通道信息
Set<Map> clientChannels = cacheClient.smemberMap(CacheConstant.CLIENT_CHANNEL + clientId);
//3、将获取到的客户通道信息根据权重做好排序
TreeSet<Map> clientWeightChannels = new TreeSet<>(new Comparator<Map>() {
@Override
public int compare(Map o1, Map o2) {
int o1Weight = Integer.parseInt(o1.get("clientChannelWeight") + "");
int o2Weight = Integer.parseInt(o2.get("clientChannelWeight") + "");
return o2Weight - o1Weight;
}
});
clientWeightChannels.addAll(clientChannels);
boolean ok = false;
Map channel = null;
Map clientChannel = null;
//4、基于排好序的通道选择权重更高的
for (Map clientWeightChannel : clientWeightChannels) {
//5、如果客户和通道的绑定关系可用直接去基于Redis查询具体的通道信息
if((int)(clientWeightChannel.get("isAvailable")) != 0){
// 当前关系不可用,直接进行下次循环,选择权重相对更低一点的
continue;
}
//6、如果通道信息查询后判断通道睡否可用其次运营商可以匹配。
channel = cacheClient.hGetAll(CacheConstant.CHANNEL + clientWeightChannel.get("channelId"));
if((int)(channel.get("isAvailable")) != 0){
// 当前通道不可用,选择权重更低的通道~
continue;
}
// 获取通道的通讯方式
Integer channelType = (Integer) channel.get("channelType");
if (channelType != 0 && submit.getOperatorId() != channelType){
// 通道不是全网通,并且和当前手机号运营商不匹配
continue;
}
//7、如果后期涉及到的通道的转换这里留一个口子
Map transferChannel = ChannelTransferUtil.transfer(submit, channel);
// 找到可以使用的通道了
ok = true;
clientChannel = clientWeightChannel;
break;
}
if(!ok){
log.info("【策略模块-路由策略】 没有选择到可用的通道!!");
submit.setErrorMsg(ExceptionEnums.NO_CHANNEL.getMsg());
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(ExceptionEnums.NO_CHANNEL);
}
//8、基于选择的通道封装submit的信息
submit.setChannelId(Long.parseLong(channel.get("id") + ""));
submit.setSrcNumber("" + channel.get("channelNumber") + clientChannel.get("clientChannelNumber"));
try {
//9、声明好队列名称并构建队列
String queueName = RabbitMQConstants.SMS_GATEWAY + submit.getChannelId();
amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build());
//10、发送消息到声明好的队列中
rabbitTemplate.convertAndSend(queueName,submit);
} catch (AmqpException e) {
log.info("【策略模块-路由策略】 声明通道对应队列以及发送消息时出现了问题!");
submit.setErrorMsg(e.getMessage());
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(e.getMessage(),ExceptionEnums.UNKNOWN_ERROR.getCode());
}
}
}

@ -0,0 +1,27 @@
package com.mashibing.strategy.util;
import com.mashibing.common.model.StandardSubmit;
import java.util.Map;
/**
*
* @author zjw
* @description
*/
public class ChannelTransferUtil {
/**
* ~~
* @param submit
* @param channel
* @return
*/
public static Map transfer(StandardSubmit submit,Map channel){
// do nothing~
return channel;
}
}
Loading…
Cancel
Save