diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/enums/LoadBalancerStrategy.java b/austin-handler/src/main/java/com/java3y/austin/handler/enums/LoadBalancerStrategy.java new file mode 100644 index 0000000..187d0b5 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/enums/LoadBalancerStrategy.java @@ -0,0 +1,41 @@ +package com.java3y.austin.handler.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +/** + * @Author Gavin + * @Date 2024/9/14 + */ +@Getter +@ToString +@AllArgsConstructor +public class LoadBalancerStrategy { + + /** + * 随机算法 + */ + public static final String SERVICE_LOAD_BALANCER_RANDOM = "random"; + + /** + * 加权随机算法 + */ + public static final String SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED = "random_weight"; + + /** + * 哈希算法 + */ + public static final String SERVICE_LOAD_BALANCER_HASH = "hash"; + + /** + * 轮询算法 + */ + public static final String SERVICE_LOAD_BALANCER_ROBIN = "robin"; + + /** + * 加权轮询算法 + */ + public static final String SERVICE_LOAD_BALANCER_ROBIN_WEIGHT = "robin_weight"; + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java index ee0369f..bf5b639 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java @@ -13,7 +13,9 @@ import com.java3y.austin.common.dto.model.SmsContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig; import com.java3y.austin.handler.domain.sms.SmsParam; +import com.java3y.austin.handler.enums.LoadBalancerStrategy; import com.java3y.austin.handler.handler.BaseHandler; +import com.java3y.austin.handler.loadbalance.ServiceLoadBalancerFactory; import com.java3y.austin.handler.script.SmsScript; import com.java3y.austin.support.dao.SmsRecordDao; import com.java3y.austin.support.domain.SmsRecord; @@ -24,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; -import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,10 +45,11 @@ public class SmsHandler extends BaseHandler{ private static final Integer AUTO_FLOW_RULE = 0; private static final String FLOW_KEY = "msgTypeSmsConfig"; private static final String FLOW_KEY_PREFIX = "message_type_"; + /** - * 安全随机数,重用性能与随机数质量更高 + * 默认负载均衡为随机加权, 待拓展读取配置, 不同Handler可绑定不同的负载均衡策略 */ - private static final SecureRandom secureRandom = new SecureRandom(); + private static final String loadBalancerStrategy = LoadBalancerStrategy.SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED; @Autowired private SmsRecordDao smsRecordDao; @@ -57,6 +59,8 @@ public class SmsHandler extends BaseHandler{ private ApplicationContext applicationContext; @Autowired private AccountUtils accountUtils; + @Autowired + private ServiceLoadBalancerFactory serviceLoadBalancer; public SmsHandler() { channelCode = ChannelType.SMS.getCode(); @@ -74,7 +78,7 @@ public class SmsHandler extends BaseHandler{ * 1、动态配置做流量负载 * 2、发送短信 */ - MessageTypeSmsConfig[] messageTypeSmsConfigs = loadBalance(getMessageTypeSmsConfig(taskInfo)); + List messageTypeSmsConfigs = serviceLoadBalancer.selectService(getMessageTypeSmsConfig(taskInfo), loadBalancerStrategy); for (MessageTypeSmsConfig messageTypeSmsConfig : messageTypeSmsConfigs) { smsParam.setScriptName(messageTypeSmsConfig.getScriptName()); smsParam.setSendAccountId(messageTypeSmsConfig.getSendAccount()); @@ -90,41 +94,6 @@ public class SmsHandler extends BaseHandler{ return false; } - /** - * 流量负载 - * 根据配置的权重优先走某个账号,并取出一个备份的 - * - * @param messageTypeSmsConfigs - */ - private MessageTypeSmsConfig[] loadBalance(List messageTypeSmsConfigs) { - - int total = 0; - for (MessageTypeSmsConfig channelConfig : messageTypeSmsConfigs) { - total += channelConfig.getWeights(); - } - - // 生成一个随机数[1,total],看落到哪个区间 - int index = secureRandom.nextInt(total) + 1; - - MessageTypeSmsConfig supplier = null; - MessageTypeSmsConfig supplierBack = null; - for (int i = 0; i < messageTypeSmsConfigs.size(); ++i) { - if (index <= messageTypeSmsConfigs.get(i).getWeights()) { - supplier = messageTypeSmsConfigs.get(i); - - // 取下一个供应商 - int j = (i + 1) % messageTypeSmsConfigs.size(); - if (i == j) { - return new MessageTypeSmsConfig[]{supplier}; - } - supplierBack = messageTypeSmsConfigs.get(j); - return new MessageTypeSmsConfig[]{supplier, supplierBack}; - } - index -= messageTypeSmsConfigs.get(i).getWeights(); - } - return new MessageTypeSmsConfig[0]; - } - /** * 如模板指定具体的明确账号,则优先发其账号,否则走到流量配置 *

diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/ServiceLoadBalancer.java b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/ServiceLoadBalancer.java new file mode 100644 index 0000000..8861022 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/ServiceLoadBalancer.java @@ -0,0 +1,25 @@ +package com.java3y.austin.handler.loadbalance; + +import java.util.List; + +/** + * @Author Gavin + * @Date 2024/9/14 + */ +public interface ServiceLoadBalancer { + + /** + * 以负载均衡的方式选取一个服务节点 + * @param servers 服务列表 + * @return 可用的服务节点 + */ + T selectOne(List servers); + + /** + * 以负载均衡的方式选取一个服务节点和一个备选服务节点 + * @param servers 服务列表 + * @return 可用的服务节点 + */ + List select(List servers); +} + diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/ServiceLoadBalancerFactory.java b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/ServiceLoadBalancerFactory.java new file mode 100644 index 0000000..d2cafb9 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/ServiceLoadBalancerFactory.java @@ -0,0 +1,55 @@ +package com.java3y.austin.handler.loadbalance; + +import com.java3y.austin.handler.loadbalance.annotations.LoadBalancer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @Author Gavin + * @Date 2024/9/14 + */ +@Service +@Slf4j +public class ServiceLoadBalancerFactory implements ApplicationContextAware { + + private ApplicationContext applicationContext; + + private final Map> serviceLoadBalancerMap = new ConcurrentHashMap<>(); + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public List selectService(List servers, String loadbalancerStrategy) { + ServiceLoadBalancer serviceLoadBalancer = serviceLoadBalancerMap.get(loadbalancerStrategy); + if (Objects.isNull(serviceLoadBalancer)) { + log.error("没有找到对应的负载均衡策略"); + return servers; + } + return serviceLoadBalancer.select(servers); + } + + @PostConstruct + private void init() { + Map serviceMap = this.applicationContext.getBeansWithAnnotation(LoadBalancer.class); + serviceMap.forEach((name, service) -> { + if (service instanceof ServiceLoadBalancer) { + LoadBalancer LoadBalancer = AopUtils.getTargetClass(service).getAnnotation(LoadBalancer.class); + String loadbalancerStrategy = LoadBalancer.loadbalancer(); + //通常情况下 实现的负载均衡service与loadBalanceStrategy一一对应 + serviceLoadBalancerMap.put(loadbalancerStrategy, (ServiceLoadBalancer) service); + } + }); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/annotations/LoadBalancer.java b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/annotations/LoadBalancer.java new file mode 100644 index 0000000..5620642 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/annotations/LoadBalancer.java @@ -0,0 +1,20 @@ +package com.java3y.austin.handler.loadbalance.annotations; + +import com.java3y.austin.handler.enums.LoadBalancerStrategy; +import org.springframework.stereotype.Service; + +import java.lang.annotation.*; + +/** + * 负载均衡策略 + * @Author Gavin + * @Date 2024/9/14 + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Service +public @interface LoadBalancer { + + String loadbalancer() default LoadBalancerStrategy.SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED; +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/base/BaseEnhancedServiceLoadBalancer.java b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/base/BaseEnhancedServiceLoadBalancer.java new file mode 100644 index 0000000..c5a5511 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/base/BaseEnhancedServiceLoadBalancer.java @@ -0,0 +1,30 @@ +package com.java3y.austin.handler.loadbalance.base; + +import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig; +import com.java3y.austin.handler.loadbalance.ServiceLoadBalancer; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; + +/** + * @Author Gavin + * @Date 2024/9/14 + */ +public abstract class BaseEnhancedServiceLoadBalancer implements ServiceLoadBalancer { + + + /** + * 根据权重重新生成服务元数据列表,权重越高的元数据,会在最终的列表中出现的次数越多 + * 例如,权重为1,最终出现1次,权重为2,最终出现2次,权重为3,最终出现3次,依此类推... + */ + protected List getWeightMessageTypeSmsConfigList(List servers) { + List list = new ArrayList<>(); + servers.forEach((server) -> { + IntStream.range(0, server.getWeights()).forEach((i) -> { + list.add(server); + }); + }); + return list; + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/impl/RandomWeightEnhancedLoadBalancerImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/impl/RandomWeightEnhancedLoadBalancerImpl.java new file mode 100644 index 0000000..52a1bdf --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/impl/RandomWeightEnhancedLoadBalancerImpl.java @@ -0,0 +1,69 @@ +package com.java3y.austin.handler.loadbalance.impl; + +import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig; +import com.java3y.austin.handler.enums.LoadBalancerStrategy; +import com.java3y.austin.handler.loadbalance.annotations.LoadBalancer; +import com.java3y.austin.handler.loadbalance.base.BaseEnhancedServiceLoadBalancer; +import lombok.extern.slf4j.Slf4j; + +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * @Author Gavin + * @Date 2024/9/14 + */ +@Slf4j +@LoadBalancer(loadbalancer = LoadBalancerStrategy.SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED) +public class RandomWeightEnhancedLoadBalancerImpl extends BaseEnhancedServiceLoadBalancer { + + /** + * 安全随机数,重用性能与随机数质量更高 + */ + private static final SecureRandom secureRandom = new SecureRandom(); + + @Override + public MessageTypeSmsConfig selectOne(List servers) { + if (servers == null || servers.isEmpty()){ + return null; + } + servers = this.getWeightMessageTypeSmsConfigList(servers); + int index = secureRandom.nextInt(servers.size()); + return servers.get(index); + } + + @Override + public List select(List servers) { + if (servers == null || servers.isEmpty()){ + return null; + } + int total = 0; + for (MessageTypeSmsConfig channelConfig : servers) { + total += channelConfig.getWeights(); + } + // 生成一个随机数[1,total],看落到哪个区间 + int index = secureRandom.nextInt(total) + 1; + + List selectedServers = new ArrayList<>(); + MessageTypeSmsConfig supplier = null; + MessageTypeSmsConfig supplierBack = null; + for (int i = 0; i < servers.size(); ++i) { + if (index <= servers.get(i).getWeights()) { + supplier = servers.get(i); + + // 取下一个供应商 + int j = (i + 1) % servers.size(); + if (i == j) { + return Collections.singletonList(supplier); + } + supplierBack = servers.get(j); + return Arrays.asList(supplier, supplierBack); + } + index -= servers.get(i).getWeights(); + } + return selectedServers; + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/impl/RobinServiceLoadBalancerImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/impl/RobinServiceLoadBalancerImpl.java new file mode 100644 index 0000000..9863997 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/loadbalance/impl/RobinServiceLoadBalancerImpl.java @@ -0,0 +1,54 @@ +package com.java3y.austin.handler.loadbalance.impl; + +import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig; +import com.java3y.austin.handler.enums.LoadBalancerStrategy; +import com.java3y.austin.handler.loadbalance.ServiceLoadBalancer; +import com.java3y.austin.handler.loadbalance.annotations.LoadBalancer; +import lombok.extern.slf4j.Slf4j; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @Author Gavin + * @Date 2024/9/14 + */ +@Slf4j +@LoadBalancer(loadbalancer = LoadBalancerStrategy.SERVICE_LOAD_BALANCER_ROBIN) +public class RobinServiceLoadBalancerImpl implements ServiceLoadBalancer { + private volatile AtomicInteger atomicInteger = new AtomicInteger(0); + + @Override + public MessageTypeSmsConfig selectOne(List servers) { + if (servers == null || servers.isEmpty()) { + return null; + } + int count = servers.size(); + int index = atomicInteger.incrementAndGet(); + if (index >= (Integer.MAX_VALUE - 10000)) { + // 当AtomicInteger递增后的值大于或者等于Integer的最大值减去10000时,会将AtomicInteger的值重置为0。 + // 这里之所以是大于或者等于Integer的最大值减去10000,是为了避免在高并发环境下由于竞态条件问题导致AtomicInteger自增后的值超过Integer的最大值,从而发生范围越界的问题。 + atomicInteger.set(0); + } + return servers.get(index % count); + } + + @Override + public List select(List servers) { + if (servers == null || servers.isEmpty()) { + return null; + } + if (servers.size() == 1) { + return servers; + } + int count = servers.size(); + int index = atomicInteger.incrementAndGet(); + if (index >= (Integer.MAX_VALUE - 10000)) { + atomicInteger.set(0); + } + int currentIndex = index % count; + int nextIndex = (index + 1) % count; + return Arrays.asList(servers.get(currentIndex), servers.get(nextIndex)); + } +}