diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/GenerateId.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/GenerateId.java new file mode 100644 index 00000000..f32ebb8a --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/GenerateId.java @@ -0,0 +1,45 @@ +package com.xxl.job.admin.core.id; + +import com.xxl.job.admin.core.id.service.MachineService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +@Component +public class GenerateId { + + private final Logger logger = LoggerFactory.getLogger(GenerateId.class); + + private SnowflakeIdWorker idWorker = null; + + @Autowired + private MachineService machineService; + + private Integer machineId = -1; + + public Long getId() { + return idWorker.nextId(); + } + + @PostConstruct + private void getIdBefore() { + //只需要第一次调用 对idworker进行初始化 + machineId = machineService.getInitMachineId(); + idWorker = new SnowflakeIdWorker(machineId); + } + + public Integer getMachineId(){ + return this.machineId; + } + + public void setMachineId(Integer machineId){ + this.machineId = machineId; + } + + public void setIdWorker(SnowflakeIdWorker snowflakeIdWorker){ + this.idWorker = snowflakeIdWorker; + } +} \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/HeartBeat.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/HeartBeat.java new file mode 100644 index 00000000..5c6d9741 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/HeartBeat.java @@ -0,0 +1,28 @@ +package com.xxl.job.admin.core.id; + +import com.xxl.job.admin.core.id.service.MachineService; +import com.xxl.job.admin.core.model.XxlJobMachine; +import com.xxl.job.admin.core.util.MachineUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Date; + +@Component +@EnableScheduling +public class HeartBeat { + + @Autowired + private MachineService machineService; + + @Scheduled(fixedDelay = 10000) + public void checkMachineSurvive(){ + String machineIp = MachineUtils.get(); + XxlJobMachine xxlJobMachine = machineService.selectByMachineIp(machineIp); + if(xxlJobMachine != null){ + machineService.update(machineIp,new Date()); + } + } +} \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/SnowflakeIdWorker.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/SnowflakeIdWorker.java new file mode 100644 index 00000000..3a660515 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/SnowflakeIdWorker.java @@ -0,0 +1,95 @@ +package com.xxl.job.admin.core.id; + +public class SnowflakeIdWorker { + + // ==============================Fields=========================================== + /** 开始时间截 (2019-01-01) */ + private final long twepoch = 1546272000000L; + + /** 机器id所占的位数 */ + private final long workerIdBits = 5L; + + /** 数据标识id所占的位数 */ + private final long datacenterIdBits = 5L; + + private final long maxMachineId = 1023; + + /** 序列在id中占的位数 */ + private final long sequenceBits = 12L; + + /** 机器ID向左移12位 */ + private final long getMachineIdShift = sequenceBits; + + /** 时间截向左移22位(5+5+12) */ + private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; + + /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** 机器Id */ + private long machineId; + + /** 毫秒内序列(0~4095) */ + private long sequence = 0L; + + /** 上次生成ID的时间截 */ + private long lastTimestamp = -1L; + + // ==============================Constructors===================================== + /** + * 构造函数 + * + * @param machineId 工作ID (0~31) + */ + public SnowflakeIdWorker(long machineId) { + if (machineId > maxMachineId || machineId < 0) { + throw new IllegalArgumentException( + String.format("worker machineId can't be greater than %d or less than 0", maxMachineId)); + } + this.machineId = machineId; + } + + // ==============================Methods========================================== + /** + * 获得下一个ID (该方法是线程安全的) + * + * @return SnowflakeId + */ + public synchronized long nextId() { + long timestamp = timeGen(); + + // 如果是同一时间生成的,则进行毫秒内序列 + if (timestamp <= lastTimestamp) { + + timestamp = lastTimestamp; + sequence = (sequence + 1) & sequenceMask; + // 毫秒内序列溢出 + if (sequence == 0) { + // 阻塞到下一个毫秒,获得新的时间戳 + timestamp++; + } + } + // 时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + // 上次生成ID的时间截 + lastTimestamp = timestamp; + + // 移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) // + | (machineId << getMachineIdShift) // + | sequence; + } + + /** + * 返回以毫秒为单位的当前时间 + * + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/impl/MachineServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/impl/MachineServiceImpl.java new file mode 100644 index 00000000..2749932d --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/impl/MachineServiceImpl.java @@ -0,0 +1,80 @@ +package com.xxl.job.admin.core.id.impl; + +import com.xxl.job.admin.core.id.service.MachineService; +import com.xxl.job.admin.core.model.XxlJobMachine; +import com.xxl.job.admin.core.util.MachineUtils; +import com.xxl.job.admin.dao.XxlJobMachineDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.Random; + +@Service +public class MachineServiceImpl implements MachineService { + + private final Logger logger = LoggerFactory.getLogger(MachineServiceImpl.class); + + @Autowired + private XxlJobMachineDao xxlJobMachineDao; + + @Override + public void save(XxlJobMachine xxlJobMachine) { + xxlJobMachineDao.save(xxlJobMachine); + } + + @Override + public void update(String machineIp, Date heartLastTime) { + xxlJobMachineDao.update(machineIp,heartLastTime); + } + + @Override + public XxlJobMachine selectByMachineIp(String machineIp) { + return xxlJobMachineDao.selectByHostIp(machineIp); + } + + @Override + public Integer selectMaxMachineId() { + return xxlJobMachineDao.selectMaxMachineId(); + } + + @Override + public Integer getInitMachineId() { + XxlJobMachine xxlJobMachine = selectByMachineIp(MachineUtils.get()); + Date nowDate = new Date(); + int machineId = -1; + if(xxlJobMachine != null){ + update(MachineUtils.get(),nowDate); + machineId = xxlJobMachine.getMachineId(); + }else{ + xxlJobMachine = new XxlJobMachine(); + xxlJobMachine.setMachineIp(MachineUtils.get()); + xxlJobMachine.setAddTime(nowDate); + xxlJobMachine.setHeartLastTime(nowDate); + Random random = new Random(); + for(int i = 0; i < 100; i++){ + try { + Integer value = selectMaxMachineId(); + machineId = value == null ? 1 : value+1; + xxlJobMachine.setMachineId(machineId); + save(xxlJobMachine); + break; + } catch (DuplicateKeyException e) { + try { + Thread.sleep(random.nextInt(1000)+1); + } catch (InterruptedException interruptedException) { + logger.error("sleep error,cause:",interruptedException); + } + logger.error("retry >>>>>>>>>>>>> "); + } catch (Exception e){ + logger.error("save error >>>>>>,system exit,cause:",e); + System.exit(0); + } + } + } + return machineId; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/service/MachineService.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/service/MachineService.java new file mode 100644 index 00000000..d02f10e9 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/id/service/MachineService.java @@ -0,0 +1,18 @@ +package com.xxl.job.admin.core.id.service; + +import com.xxl.job.admin.core.model.XxlJobMachine; + +import java.util.Date; + +public interface MachineService { + + void save(XxlJobMachine xxlJobMachine); + + void update(String machineIp, Date heartLastTime); + + XxlJobMachine selectByMachineIp(String machineIp); + + Integer selectMaxMachineId(); + + Integer getInitMachineId(); +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobMachine.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobMachine.java new file mode 100644 index 00000000..e4193af0 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobMachine.java @@ -0,0 +1,58 @@ +package com.xxl.job.admin.core.model; + +import java.util.Date; + +public class XxlJobMachine { + + /** + * 主机IP + */ + private String machineIp; + + /** + * 主机IP对应的机器码 + */ + private Integer machineId; + + /** + * 创建时间 + */ + private Date addTime; + + /** + * 最后一次心跳时间 + */ + private Date heartLastTime; + + public Date getHeartLastTime() { + return heartLastTime; + } + + public void setHeartLastTime(Date heartLastTime) { + this.heartLastTime = heartLastTime; + } + + public Integer getMachineId() { + return machineId; + } + + public void setMachineId(Integer machineId) { + this.machineId = machineId; + } + + public Date getAddTime() { + return addTime; + } + + public void setAddTime(Date addTime) { + this.addTime = addTime; + } + + public String getMachineIp() { + return machineIp; + } + + public void setMachineIp(String machineIp) { + this.machineIp = machineIp; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/MachineUtils.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/MachineUtils.java new file mode 100644 index 00000000..703443b2 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/util/MachineUtils.java @@ -0,0 +1,27 @@ +package com.xxl.job.admin.core.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class MachineUtils { + + private static final Logger logger = LoggerFactory.getLogger(MachineUtils.class); + + private static String machineIp = null; + + public static String get() { + if(machineIp == null){ + try { + //获取本机IP地址 + machineIp = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + logger.error("get host error:",e); + throw new RuntimeException("get host error"); + } + } + return machineIp; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobMachineDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobMachineDao.java new file mode 100644 index 00000000..88ac0aae --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobMachineDao.java @@ -0,0 +1,20 @@ +package com.xxl.job.admin.dao; + +import com.xxl.job.admin.core.model.XxlJobMachine; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; + +@Mapper +public interface XxlJobMachineDao { + + void save(@Param("xxlJobMachine")XxlJobMachine xxlJobMachine); + + void update(@Param("machineIp") String machineIp, @Param("heartLastTime") Date heartLastTime); + + XxlJobMachine selectByHostIp(String machineIp); + + Integer selectMaxMachineId(); + +} diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobMachineMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobMachineMapper.xml new file mode 100644 index 00000000..7bd2fcd4 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobMachineMapper.xml @@ -0,0 +1,38 @@ + + + + + + + + + + + + + machine_ip,machine_id,add_time,heart_last_time + + + + + + INSERT INTO xxl_job_machine ( machine_ip, machine_id, add_time, heart_last_time) + VALUES ( #{xxlJobMachine.machineIp}, #{xxlJobMachine.machineId}, #{xxlJobMachine.addTime,jdbcType=TIMESTAMP}, #{xxlJobMachine.heartLastTime,jdbcType=TIMESTAMP}) + + + + UPDATE xxl_job_machine + SET heart_last_time = #{heartLastTime,jdbcType=TIMESTAMP} + WHERE machine_ip = #{machineIp} + + + + + \ No newline at end of file