diff --git a/beacon-common/src/main/java/com/mashibing/common/constant/CacheConstant.java b/beacon-common/src/main/java/com/mashibing/common/constant/CacheConstant.java index 5990309..2520f8c 100644 --- a/beacon-common/src/main/java/com/mashibing/common/constant/CacheConstant.java +++ b/beacon-common/src/main/java/com/mashibing/common/constant/CacheConstant.java @@ -58,5 +58,14 @@ public interface CacheConstant { */ String LIMIT_HOURS = "limit:hours:"; + /** + * 客户和通道绑定的信息 + */ + String CLIENT_CHANNEL = "client_channel:"; + + /** + * 通道信息的前缀 + */ + String CHANNEL = "channel:"; } diff --git a/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java b/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java index 2b77fbc..513f0b8 100644 --- a/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java +++ b/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstants.java @@ -28,5 +28,10 @@ public interface RabbitMQConstants { */ String SMS_PUSH_REPORT = "sms_push_report_topic"; + /** + * 策略模块推送消息到短信网关模块的队列前缀名称 + */ + String SMS_GATEWAY = "sms_gateway_topic_"; + } diff --git a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java index e8c3cf9..23b337c 100644 --- a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java +++ b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java @@ -8,7 +8,7 @@ import lombok.Getter; */ @Getter public enum ExceptionEnums { - + UNKNOWN_ERROR(-999,"未知错误!"), ERROR_APIKEY(-1,"非法的apikey"), IP_NOT_WHITE(-2,"请求的ip不在白名单内"), ERROR_SIGN(-3,"无可用签名"), @@ -23,6 +23,7 @@ public enum ExceptionEnums { BLACK_CLIENT(-15,"当前手机号为客户黑名单!"), ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效,无法发送短信"), ONE_HOUR_LIMIT(-17,"1小时限流规则生效,无法发送短信"), + NO_CHANNEL(-18,"没有选择到合适的通道!"), ; private Integer code; diff --git a/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java b/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java index 8ac4f25..108a8fd 100644 --- a/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java +++ b/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java @@ -132,6 +132,7 @@ public class StandardSubmit implements Serializable { */ private Long oneHourLimitMilli; + // 后续再做封装~~~~ diff --git a/beacon-strategy/src/main/java/com/mashibing/strategy/client/BeaconCacheClient.java b/beacon-strategy/src/main/java/com/mashibing/strategy/client/BeaconCacheClient.java index 34acf0c..7f549d7 100644 --- a/beacon-strategy/src/main/java/com/mashibing/strategy/client/BeaconCacheClient.java +++ b/beacon-strategy/src/main/java/com/mashibing/strategy/client/BeaconCacheClient.java @@ -3,6 +3,7 @@ package com.mashibing.strategy.client; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.*; +import java.util.Map; import java.util.Set; /** @@ -27,6 +28,9 @@ public interface BeaconCacheClient { @GetMapping("/cache/smember/{key}") Set smember(@PathVariable(value = "key")String key); + @GetMapping("/cache/smember/{key}") + Set smemberMap(@PathVariable(value = "key")String key); + @PostMapping(value = "/cache/zadd/{key}/{score}/{member}") Boolean zadd(@PathVariable(value = "key")String key, @PathVariable(value = "score")Long score, @@ -44,4 +48,7 @@ public interface BeaconCacheClient { Long hIncrBy(@PathVariable(value = "key") String key, @PathVariable(value = "field") String field, @PathVariable(value = "delta") Long delta); + + @GetMapping("/cache/hgetall/{key}") + Map hGetAll(@PathVariable(value = "key")String key); } diff --git a/beacon-strategy/src/main/java/com/mashibing/strategy/filter/impl/RouteStrategyFilter.java b/beacon-strategy/src/main/java/com/mashibing/strategy/filter/impl/RouteStrategyFilter.java index 7497e20..0c087a7 100644 --- a/beacon-strategy/src/main/java/com/mashibing/strategy/filter/impl/RouteStrategyFilter.java +++ b/beacon-strategy/src/main/java/com/mashibing/strategy/filter/impl/RouteStrategyFilter.java @@ -1,10 +1,27 @@ package com.mashibing.strategy.filter.impl; +import com.mashibing.common.constant.CacheConstant; +import com.mashibing.common.constant.RabbitMQConstants; +import com.mashibing.common.enums.ExceptionEnums; +import com.mashibing.common.exception.StrategyException; import com.mashibing.common.model.StandardSubmit; +import com.mashibing.strategy.client.BeaconCacheClient; import com.mashibing.strategy.filter.StrategyFilter; +import com.mashibing.strategy.util.ChannelTransferUtil; +import com.mashibing.strategy.util.ErrorSendMsgUtil; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + /** * 路由策略,选择合适的运营商通道 * @author zjw @@ -13,8 +30,100 @@ import org.springframework.stereotype.Service; @Service(value = "route") @Slf4j public class RouteStrategyFilter implements StrategyFilter { + + @Autowired + private BeaconCacheClient cacheClient; + + @Autowired + private ErrorSendMsgUtil sendMsgUtil; + + @Autowired + private AmqpAdmin amqpAdmin; + + @Autowired + private RabbitTemplate rabbitTemplate; + @Override public void strategy(StandardSubmit submit) { log.info("【策略模块-路由策略】 校验ing…………"); + //1、拿到客户id + Long clientId = submit.getClientId(); + + //2、基于redis获取当前客户绑定的所有通道信息 + Set clientChannels = cacheClient.smemberMap(CacheConstant.CLIENT_CHANNEL + clientId); + + //3、将获取到的客户通道信息根据权重做好排序 + TreeSet clientWeightChannels = new TreeSet<>(new Comparator() { + @Override + public int compare(Map o1, Map o2) { + int o1Weight = Integer.parseInt(o1.get("clientChannelWeight") + ""); + int o2Weight = Integer.parseInt(o2.get("clientChannelWeight") + ""); + return o2Weight - o1Weight; + } + }); + clientWeightChannels.addAll(clientChannels); + + boolean ok = false; + Map channel = null; + Map clientChannel = null; + //4、基于排好序的通道选择,权重更高的 + for (Map clientWeightChannel : clientWeightChannels) { + //5、如果客户和通道的绑定关系可用,直接去基于Redis查询具体的通道信息 + if((int)(clientWeightChannel.get("isAvailable")) != 0){ + // 当前关系不可用,直接进行下次循环,选择权重相对更低一点的 + continue; + } + + //6、如果通道信息查询后,判断通道睡否可用,其次运营商可以匹配。 + channel = cacheClient.hGetAll(CacheConstant.CHANNEL + clientWeightChannel.get("channelId")); + if((int)(channel.get("isAvailable")) != 0){ + // 当前通道不可用,选择权重更低的通道~ + continue; + } + // 获取通道的通讯方式 + Integer channelType = (Integer) channel.get("channelType"); + if (channelType != 0 && submit.getOperatorId() != channelType){ + // 通道不是全网通,并且和当前手机号运营商不匹配 + continue; + } + + //7、如果后期涉及到的通道的转换,这里留一个口子 + Map transferChannel = ChannelTransferUtil.transfer(submit, channel); + + // 找到可以使用的通道了 + ok = true; + clientChannel = clientWeightChannel; + break; + } + + if(!ok){ + log.info("【策略模块-路由策略】 没有选择到可用的通道!!"); + submit.setErrorMsg(ExceptionEnums.NO_CHANNEL.getMsg()); + sendMsgUtil.sendWriteLog(submit); + sendMsgUtil.sendPushReport(submit); + throw new StrategyException(ExceptionEnums.NO_CHANNEL); + } + + + //8、基于选择的通道封装submit的信息 + submit.setChannelId(Long.parseLong(channel.get("id") + "")); + submit.setSrcNumber("" + channel.get("channelNumber") + clientChannel.get("clientChannelNumber")); + + try { + //9、声明好队列名称,并构建队列 + String queueName = RabbitMQConstants.SMS_GATEWAY + submit.getChannelId(); + amqpAdmin.declareQueue(QueueBuilder.durable(queueName).build()); + + //10、发送消息到声明好的队列中 + rabbitTemplate.convertAndSend(queueName,submit); + } catch (AmqpException e) { + log.info("【策略模块-路由策略】 声明通道对应队列以及发送消息时出现了问题!"); + submit.setErrorMsg(e.getMessage()); + sendMsgUtil.sendWriteLog(submit); + sendMsgUtil.sendPushReport(submit); + throw new StrategyException(e.getMessage(),ExceptionEnums.UNKNOWN_ERROR.getCode()); + } + + } } diff --git a/beacon-strategy/src/main/java/com/mashibing/strategy/util/ChannelTransferUtil.java b/beacon-strategy/src/main/java/com/mashibing/strategy/util/ChannelTransferUtil.java new file mode 100644 index 0000000..db575a1 --- /dev/null +++ b/beacon-strategy/src/main/java/com/mashibing/strategy/util/ChannelTransferUtil.java @@ -0,0 +1,27 @@ +package com.mashibing.strategy.util; + +import com.mashibing.common.model.StandardSubmit; + +import java.util.Map; + +/** + * 通道转换留的口子 + * @author zjw + * @description + */ +public class ChannelTransferUtil { + + + /** + * 暂时啥都不做~~ + * @param submit + * @param channel + * @return + */ + public static Map transfer(StandardSubmit submit,Map channel){ + // do nothing~ + return channel; + } + + +}