扩展负载均衡策略,添加轮询策略的负载均衡实现

master
GAhlin 4 months ago
parent ca4d91a8c2
commit 0c7fbc79b3

@ -0,0 +1,41 @@
package com.java3y.austin.handler.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
* @Author Gavin
* @Date 2024/9/14
*/
@Getter
@ToString
@AllArgsConstructor
public class LoadBalancerStrategy {
/**
*
*/
public static final String SERVICE_LOAD_BALANCER_RANDOM = "random";
/**
*
*/
public static final String SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED = "random_weight";
/**
*
*/
public static final String SERVICE_LOAD_BALANCER_HASH = "hash";
/**
*
*/
public static final String SERVICE_LOAD_BALANCER_ROBIN = "robin";
/**
*
*/
public static final String SERVICE_LOAD_BALANCER_ROBIN_WEIGHT = "robin_weight";
}

@ -13,7 +13,9 @@ import com.java3y.austin.common.dto.model.SmsContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig;
import com.java3y.austin.handler.domain.sms.SmsParam;
import com.java3y.austin.handler.enums.LoadBalancerStrategy;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.loadbalance.ServiceLoadBalancerFactory;
import com.java3y.austin.handler.script.SmsScript;
import com.java3y.austin.support.dao.SmsRecordDao;
import com.java3y.austin.support.domain.SmsRecord;
@ -24,7 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -44,10 +45,11 @@ public class SmsHandler extends BaseHandler{
private static final Integer AUTO_FLOW_RULE = 0;
private static final String FLOW_KEY = "msgTypeSmsConfig";
private static final String FLOW_KEY_PREFIX = "message_type_";
/**
*
* , , Handler
*/
private static final SecureRandom secureRandom = new SecureRandom();
private static final String loadBalancerStrategy = LoadBalancerStrategy.SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED;
@Autowired
private SmsRecordDao smsRecordDao;
@ -57,6 +59,8 @@ public class SmsHandler extends BaseHandler{
private ApplicationContext applicationContext;
@Autowired
private AccountUtils accountUtils;
@Autowired
private ServiceLoadBalancerFactory<MessageTypeSmsConfig> serviceLoadBalancer;
public SmsHandler() {
channelCode = ChannelType.SMS.getCode();
@ -74,7 +78,7 @@ public class SmsHandler extends BaseHandler{
* 1
* 2
*/
MessageTypeSmsConfig[] messageTypeSmsConfigs = loadBalance(getMessageTypeSmsConfig(taskInfo));
List<MessageTypeSmsConfig> messageTypeSmsConfigs = serviceLoadBalancer.selectService(getMessageTypeSmsConfig(taskInfo), loadBalancerStrategy);
for (MessageTypeSmsConfig messageTypeSmsConfig : messageTypeSmsConfigs) {
smsParam.setScriptName(messageTypeSmsConfig.getScriptName());
smsParam.setSendAccountId(messageTypeSmsConfig.getSendAccount());
@ -90,41 +94,6 @@ public class SmsHandler extends BaseHandler{
return false;
}
/**
*
*
*
* @param messageTypeSmsConfigs
*/
private MessageTypeSmsConfig[] loadBalance(List<MessageTypeSmsConfig> messageTypeSmsConfigs) {
int total = 0;
for (MessageTypeSmsConfig channelConfig : messageTypeSmsConfigs) {
total += channelConfig.getWeights();
}
// 生成一个随机数[1,total],看落到哪个区间
int index = secureRandom.nextInt(total) + 1;
MessageTypeSmsConfig supplier = null;
MessageTypeSmsConfig supplierBack = null;
for (int i = 0; i < messageTypeSmsConfigs.size(); ++i) {
if (index <= messageTypeSmsConfigs.get(i).getWeights()) {
supplier = messageTypeSmsConfigs.get(i);
// 取下一个供应商
int j = (i + 1) % messageTypeSmsConfigs.size();
if (i == j) {
return new MessageTypeSmsConfig[]{supplier};
}
supplierBack = messageTypeSmsConfigs.get(j);
return new MessageTypeSmsConfig[]{supplier, supplierBack};
}
index -= messageTypeSmsConfigs.get(i).getWeights();
}
return new MessageTypeSmsConfig[0];
}
/**
*
* <p>

@ -0,0 +1,25 @@
package com.java3y.austin.handler.loadbalance;
import java.util.List;
/**
* @Author Gavin
* @Date 2024/9/14
*/
public interface ServiceLoadBalancer<T> {
/**
*
* @param servers
* @return
*/
T selectOne(List<T> servers);
/**
*
* @param servers
* @return
*/
List<T> select(List<T> servers);
}

@ -0,0 +1,55 @@
package com.java3y.austin.handler.loadbalance;
import com.java3y.austin.handler.loadbalance.annotations.LoadBalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author Gavin
* @Date 2024/9/14
*/
@Service
@Slf4j
public class ServiceLoadBalancerFactory<T> implements ApplicationContextAware {
private ApplicationContext applicationContext;
private final Map<String, ServiceLoadBalancer<T>> serviceLoadBalancerMap = new ConcurrentHashMap<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public List<T> selectService(List<T> servers, String loadbalancerStrategy) {
ServiceLoadBalancer<T> serviceLoadBalancer = serviceLoadBalancerMap.get(loadbalancerStrategy);
if (Objects.isNull(serviceLoadBalancer)) {
log.error("没有找到对应的负载均衡策略");
return servers;
}
return serviceLoadBalancer.select(servers);
}
@PostConstruct
private void init() {
Map<String, Object> serviceMap = this.applicationContext.getBeansWithAnnotation(LoadBalancer.class);
serviceMap.forEach((name, service) -> {
if (service instanceof ServiceLoadBalancer) {
LoadBalancer LoadBalancer = AopUtils.getTargetClass(service).getAnnotation(LoadBalancer.class);
String loadbalancerStrategy = LoadBalancer.loadbalancer();
//通常情况下 实现的负载均衡service与loadBalanceStrategy一一对应
serviceLoadBalancerMap.put(loadbalancerStrategy, (ServiceLoadBalancer) service);
}
});
}
}

@ -0,0 +1,22 @@
package com.java3y.austin.handler.loadbalance.annotations;
import com.java3y.austin.handler.enums.LoadBalancerStrategy;
import org.springframework.stereotype.Service;
import java.lang.annotation.*;
/**
*
* Created by TOM
* On 2022/7/21 17:03
*
* @author TOM
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface LoadBalancer {
String loadbalancer() default LoadBalancerStrategy.SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED;
}

@ -0,0 +1,30 @@
package com.java3y.austin.handler.loadbalance.base;
import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig;
import com.java3y.austin.handler.loadbalance.ServiceLoadBalancer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
/**
* @Author Gavin
* @Date 2024/9/14
*/
public abstract class BaseEnhancedServiceLoadBalancer implements ServiceLoadBalancer<MessageTypeSmsConfig> {
/**
*
* 112233...
*/
protected List<MessageTypeSmsConfig> getWeightMessageTypeSmsConfigList(List<MessageTypeSmsConfig> servers) {
List<MessageTypeSmsConfig> list = new ArrayList<>();
servers.forEach((server) -> {
IntStream.range(0, server.getWeights()).forEach((i) -> {
list.add(server);
});
});
return list;
}
}

@ -0,0 +1,69 @@
package com.java3y.austin.handler.loadbalance.impl;
import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig;
import com.java3y.austin.handler.enums.LoadBalancerStrategy;
import com.java3y.austin.handler.loadbalance.annotations.LoadBalancer;
import com.java3y.austin.handler.loadbalance.base.BaseEnhancedServiceLoadBalancer;
import lombok.extern.slf4j.Slf4j;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* @Author Gavin
* @Date 2024/9/14
*/
@Slf4j
@LoadBalancer(loadbalancer = LoadBalancerStrategy.SERVICE_LOAD_BALANCER_RANDOM_WEIGHT_ENHANCED)
public class RandomWeightEnhancedLoadBalancerImpl extends BaseEnhancedServiceLoadBalancer {
/**
*
*/
private static final SecureRandom secureRandom = new SecureRandom();
@Override
public MessageTypeSmsConfig selectOne(List<MessageTypeSmsConfig> servers) {
if (servers == null || servers.isEmpty()){
return null;
}
servers = this.getWeightMessageTypeSmsConfigList(servers);
int index = secureRandom.nextInt(servers.size());
return servers.get(index);
}
@Override
public List<MessageTypeSmsConfig> select(List<MessageTypeSmsConfig> servers) {
if (servers == null || servers.isEmpty()){
return null;
}
int total = 0;
for (MessageTypeSmsConfig channelConfig : servers) {
total += channelConfig.getWeights();
}
// 生成一个随机数[1,total],看落到哪个区间
int index = secureRandom.nextInt(total) + 1;
List<MessageTypeSmsConfig> selectedServers = new ArrayList<>();
MessageTypeSmsConfig supplier = null;
MessageTypeSmsConfig supplierBack = null;
for (int i = 0; i < servers.size(); ++i) {
if (index <= servers.get(i).getWeights()) {
supplier = servers.get(i);
// 取下一个供应商
int j = (i + 1) % servers.size();
if (i == j) {
return Collections.singletonList(supplier);
}
supplierBack = servers.get(j);
return Arrays.asList(supplier, supplierBack);
}
index -= servers.get(i).getWeights();
}
return selectedServers;
}
}

@ -0,0 +1,54 @@
package com.java3y.austin.handler.loadbalance.impl;
import com.java3y.austin.handler.domain.sms.MessageTypeSmsConfig;
import com.java3y.austin.handler.enums.LoadBalancerStrategy;
import com.java3y.austin.handler.loadbalance.ServiceLoadBalancer;
import com.java3y.austin.handler.loadbalance.annotations.LoadBalancer;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author Gavin
* @Date 2024/9/14
*/
@Slf4j
@LoadBalancer(loadbalancer = LoadBalancerStrategy.SERVICE_LOAD_BALANCER_ROBIN)
public class RobinServiceLoadBalancerImpl implements ServiceLoadBalancer<MessageTypeSmsConfig> {
private volatile AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public MessageTypeSmsConfig selectOne(List<MessageTypeSmsConfig> servers) {
if (servers == null || servers.isEmpty()) {
return null;
}
int count = servers.size();
int index = atomicInteger.incrementAndGet();
if (index >= (Integer.MAX_VALUE - 10000)) {
// 当AtomicInteger递增后的值大于或者等于Integer的最大值减去10000时会将AtomicInteger的值重置为0。
// 这里之所以是大于或者等于Integer的最大值减去10000是为了避免在高并发环境下由于竞态条件问题导致AtomicInteger自增后的值超过Integer的最大值从而发生范围越界的问题。
atomicInteger.set(0);
}
return servers.get(index % count);
}
@Override
public List<MessageTypeSmsConfig> select(List<MessageTypeSmsConfig> servers) {
if (servers == null || servers.isEmpty()) {
return null;
}
if (servers.size() == 1) {
return servers;
}
int count = servers.size();
int index = atomicInteger.incrementAndGet();
if (index >= (Integer.MAX_VALUE - 10000)) {
atomicInteger.set(0);
}
int currentIndex = index % count;
int nextIndex = (index + 1) % count;
return Arrays.asList(servers.get(currentIndex), servers.get(nextIndex));
}
}
Loading…
Cancel
Save