From 68efe5fafa12fd6ce17983c7cacf3cb884f60c2b Mon Sep 17 00:00:00 2001 From: "yong.li07" Date: Tue, 4 Jul 2023 10:27:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=9D=87=E6=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/db/tables_xxl_job.sql | 15 +- .../admin/core/conf/XxlJobAdminConfig.java | 54 ++++++- .../job/admin/core/model/XxlJobCluster.java | 26 +++ .../xxl/job/admin/core/model/XxlJobInfo.java | 19 +++ .../admin/core/thread/JobRegistryHelper.java | 38 +++++ .../admin/core/thread/JobScheduleHelper.java | 21 ++- .../core/thread/JobTriggerPoolHelper.java | 4 +- .../xxl/job/admin/dao/XxlJobClusterDao.java | 17 ++ .../com/xxl/job/admin/dao/XxlJobInfoDao.java | 62 +++++--- .../xxl/job/admin/service/JobAllocation.java | 22 +++ .../service/impl/AverageJobAllocation.java | 149 ++++++++++++++++++ .../admin/service/impl/XxlJobServiceImpl.java | 13 +- .../src/main/resources/application.properties | 9 +- xxl-job-admin/src/main/resources/logback.xml | 2 +- .../mybatis-mapper/XxlJobClusterMapper.xml | 17 ++ .../mybatis-mapper/XxlJobInfoMapper.xml | 40 ++++- .../service/jobhandler/SampleXxlJob.java | 2 + .../src/main/resources/application.properties | 6 +- .../src/main/resources/logback.xml | 2 +- 19 files changed, 474 insertions(+), 44 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java create mode 100644 xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml diff --git a/doc/db/tables_xxl_job.sql b/doc/db/tables_xxl_job.sql index a3202128..61851114 100644 --- a/doc/db/tables_xxl_job.sql +++ b/doc/db/tables_xxl_job.sql @@ -29,10 +29,14 @@ CREATE TABLE `xxl_job_info` ( `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注', `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间', `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔', - `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', + `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行 2.预备执行中', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', - PRIMARY KEY (`id`) + `host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名', + `lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态:1-改任务锁定中 0-未锁定', + PRIMARY KEY (`id`), + KEY `idx_host_name` (`host_name`), + KEY `idx_lock` (`lock_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( @@ -113,10 +117,17 @@ CREATE TABLE `xxl_job_lock` ( PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE TABLE `xxl_job_cluster` ( + `host_name` varchar(100) NOT NULL COMMENT '实例名', + `update_time` datetime DEFAULT NULL comment '更新时间', + PRIMARY KEY (`host_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' ); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); +INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock'); commit; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java index 380b8a59..55a25146 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java @@ -1,8 +1,15 @@ package com.xxl.job.admin.core.conf; import com.xxl.job.admin.core.alarm.JobAlarmer; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; +import com.xxl.job.admin.service.impl.AverageJobAllocation; +import com.xxl.job.core.util.IpUtil; +import io.micrometer.core.instrument.util.StringUtils; +import io.netty.util.internal.StringUtil; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; @@ -12,6 +19,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Arrays; +import java.util.List; /** * xxl-job config @@ -23,6 +31,7 @@ import java.util.Arrays; public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private static XxlJobAdminConfig adminConfig = null; + public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } @@ -35,9 +44,10 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Override public void afterPropertiesSet() throws Exception { adminConfig = this; - + getJobAllocation().init(true); xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); + getJobAllocation().flush(); } @Override @@ -67,6 +77,17 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Value("${xxl.job.logretentiondays}") private int logretentiondays; + + @Value("${xxl.job.cluster.host.name}") + private String hostName; + + @Value("${server.port}") + private int port; + + @Value("${xxl.job.cluster.enable:false}") + private boolean clusterEnable; + + // dao, service @Resource @@ -80,12 +101,28 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Resource private XxlJobLogReportDao xxlJobLogReportDao; @Resource + private XxlJobClusterDao xxlJobClusterDao; + @Resource private JavaMailSender mailSender; @Resource private DataSource dataSource; @Resource private JobAlarmer jobAlarmer; + public String getHostName() { + return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName; + } + + private JobAllocation jobAllocation = defaultJobAllocation; + private static JobAllocation defaultJobAllocation = new JobAllocation() { + @Override + public void allocation(XxlJobInfo jobInfo) { + } + + @Override + public void init(boolean init) { + } + }; public String getI18n() { if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) { @@ -155,4 +192,19 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { return jobAlarmer; } + public XxlJobClusterDao getXxlJobClusterDao() { + return xxlJobClusterDao; + } + + public JobAllocation getJobAllocation() { + return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation; + } + + public void setJobAllocation(JobAllocation jobAllocation) { + this.jobAllocation = jobAllocation; + } + + public boolean isClusterEnable() { + return clusterEnable; + } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java new file mode 100644 index 00000000..4a7c2fb6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java @@ -0,0 +1,26 @@ +package com.xxl.job.admin.core.model; + +import java.util.Date; + +public class XxlJobCluster { + + private String hostName; + + private Date updateTime; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java index e47b6dc6..5b38fb45 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java @@ -41,7 +41,9 @@ public class XxlJobInfo { private int triggerStatus; // 调度状态:0-停止,1-运行 private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 + private String hostName; // 运行实例名称 + private int lockStatus; public int getId() { return id; @@ -234,4 +236,21 @@ public class XxlJobInfo { public void setTriggerNextTime(long triggerNextTime) { this.triggerNextTime = triggerNextTime; } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public int getLockStatus() { + return lockStatus; + } + + public void setLockStatus(int lockStatus) { + this.lockStatus = lockStatus; + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 7c487f6b..44c2fbca 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -1,17 +1,23 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.DateUtil; +import com.xxl.job.core.util.IpUtil; +import org.apache.catalina.startup.HostConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; /** * job registry instance @@ -26,9 +32,18 @@ public class JobRegistryHelper { } private ThreadPoolExecutor registryOrRemoveThreadPool = null; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "cluster host registry"); + } + }); + private Thread registryMonitorThread; private volatile boolean toStop = false; + + public void start(){ // for registry or remove @@ -131,6 +146,29 @@ public class JobRegistryHelper { registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName()); + + Date date = new Date(); + + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5)); + + XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo(); + if(jobInfo!=null){ + XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush(); + } + } catch (Exception e) { + logger.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 0, 1000 * 30, TimeUnit.MILLISECONDS); } public void toStop(){ diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 906b2306..35baee67 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -6,6 +6,7 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum; import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.core.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +16,7 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author xuxueli 2019-05-21 @@ -79,8 +81,24 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); //TODO 下次执行时间 在5秒内的 整个框架的核心 - List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); + List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao() + .scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount, XxlJobAdminConfig.getAdminConfig().getHostName(), + XxlJobAdminConfig.getAdminConfig().isClusterEnable()); if (scheduleList!=null && scheduleList.size()>0) { + + if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream() + .map(XxlJobInfo::getId).collect(Collectors.toList())); + + try { + conn.commit(); + } catch (SQLException e) { + if (!scheduleThreadToStop) { + logger.error(e.getMessage(), e); + } + } + } + scheduleList.forEach(x->logger.info(x.getId()+"")); // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { @@ -154,6 +172,7 @@ public class JobScheduleHelper { // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { + jobInfo.setLockStatus(0); XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index c4b14581..f27b1bd5 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -30,7 +30,7 @@ public class JobTriggerPoolHelper { XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(1000), + new LinkedBlockingQueue(10000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -43,7 +43,7 @@ public class JobTriggerPoolHelper { XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(2000), + new LinkedBlockingQueue(20000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java new file mode 100644 index 00000000..8fb5a14a --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java @@ -0,0 +1,17 @@ +package com.xxl.job.admin.dao; + +import com.xxl.job.admin.core.model.XxlJobCluster; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +@Mapper +public interface XxlJobClusterDao { + void replace(@Param("ip") String ip); + + List findAll(); + + void delete(@Param("time") Date date); +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java index d640efff..45564ecc 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java @@ -9,41 +9,53 @@ import java.util.List; /** * job info + * * @author xuxueli 2016-1-12 18:03:45 */ @Mapper public interface XxlJobInfoDao { - public List pageList(@Param("offset") int offset, - @Param("pagesize") int pagesize, - @Param("jobGroup") int jobGroup, - @Param("triggerStatus") int triggerStatus, - @Param("jobDesc") String jobDesc, - @Param("executorHandler") String executorHandler, - @Param("author") String author); - public int pageListCount(@Param("offset") int offset, - @Param("pagesize") int pagesize, - @Param("jobGroup") int jobGroup, - @Param("triggerStatus") int triggerStatus, - @Param("jobDesc") String jobDesc, - @Param("executorHandler") String executorHandler, - @Param("author") String author); - - public int save(XxlJobInfo info); + public List pageList(@Param("offset") int offset, + @Param("pagesize") int pagesize, + @Param("jobGroup") int jobGroup, + @Param("triggerStatus") int triggerStatus, + @Param("jobDesc") String jobDesc, + @Param("executorHandler") String executorHandler, + @Param("author") String author); - public XxlJobInfo loadById(@Param("id") int id); - - public int update(XxlJobInfo xxlJobInfo); - - public int delete(@Param("id") long id); + public int pageListCount(@Param("offset") int offset, + @Param("pagesize") int pagesize, + @Param("jobGroup") int jobGroup, + @Param("triggerStatus") int triggerStatus, + @Param("jobDesc") String jobDesc, + @Param("executorHandler") String executorHandler, + @Param("author") String author); - public List getJobsByGroup(@Param("jobGroup") int jobGroup); + public int save(XxlJobInfo info); - public int findAllCount(); + public XxlJobInfo loadById(@Param("id") int id); - public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize ); + public int update(XxlJobInfo xxlJobInfo); - public int scheduleUpdate(XxlJobInfo xxlJobInfo); + public int delete(@Param("id") long id); + public List getJobsByGroup(@Param("jobGroup") int jobGroup); + public int findAllCount(); + + public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize, + @Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable); + + public int scheduleUpdate(XxlJobInfo xxlJobInfo); + + + void updateStatusById(@Param("list") List collect); + + List pageById(@Param("id") Integer id); + + void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List v); + + void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init); + + XxlJobInfo findOldClusterInfo(); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java new file mode 100644 index 00000000..245d1881 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java @@ -0,0 +1,22 @@ +package com.xxl.job.admin.service; + +import com.xxl.job.admin.core.model.XxlJobInfo; + +public interface JobAllocation { + + /** + * 计算单个任务的分配 + * @param jobInfo + */ + default void allocation(XxlJobInfo jobInfo){} + + /** + * 用于节点起来的时候 重新计算任务的分配 + */ + default void flush(){} + + /** + * 启动初始化相关动作 + */ + default void init(boolean init){} +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java new file mode 100644 index 00000000..42a6e3e6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java @@ -0,0 +1,149 @@ +package com.xxl.job.admin.service.impl; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.service.JobAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * @author liyong + */ +public class AverageJobAllocation implements JobAllocation { + + private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class); + + + private static int oneMinute = 60; + private static int tenMinutes = oneMinute * 10; + private static int oneHour = tenMinutes * 6; + private static int oneDay = oneHour * 24; + + private Map averageMap = new ConcurrentHashMap<>(); + private Map> ipMap = new ConcurrentHashMap<>(); + + { + // 防止执行频率高的 扎堆在一起 + averageMap.put(oneMinute, new AtomicInteger(0)); + averageMap.put(tenMinutes, new AtomicInteger(0)); + averageMap.put(oneHour, new AtomicInteger(0)); + averageMap.put(oneDay, new AtomicInteger(0)); + averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0)); + } + + @Override + public void allocation(XxlJobInfo jobInfo) { + List all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll(); + List ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000) + .map(XxlJobCluster::getHostName).collect(Collectors.toList()); + + if (ipList.isEmpty()) { + jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName()); + return; + } + + Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis()); + int i = averageMap.get(key).incrementAndGet(); + + jobInfo.setHostName(ipList.get(i % ipList.size())); + + } + + @Override + public void init(boolean init) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init); + } + + @Override + public void flush() { + Connection conn = null; + Boolean connAutoCommit = null; + PreparedStatement preparedStatement = null; + + try { + conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); + connAutoCommit = conn.getAutoCommit(); + conn.setAutoCommit(false); + + preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update"); + preparedStatement.execute(); + + recursion(0, conn); + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); + } finally { + // commit + if (conn != null) { + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.setAutoCommit(connAutoCommit); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + // close PreparedStatement + if (null != preparedStatement) { + try { + preparedStatement.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + } + + private void recursion(int id, Connection conn) { + List list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id); + if (CollectionUtils.isEmpty(list)) + return; + Map> map = new HashMap<>(); + list.forEach(x -> { + allocation(x); + List ids = map.getOrDefault(x.getHostName(), new ArrayList<>()); + ids.add(x.getId()); + map.put(x.getHostName(), ids); + }); + map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v)); + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + recursion(list.get(list.size() - 1).getId(), conn); + } + + private Integer getKey(long time) { + if (time < oneMinute) { + return oneMinute; + } else if (time < tenMinutes) { + return tenMinutes; + } else if (time < oneHour) { + return oneHour; + } else if (time < oneDay) { + return oneDay; + } else { + return Integer.MAX_VALUE; + } + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 530ee41c..09022c5a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.service.impl; +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; @@ -10,6 +11,7 @@ import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.thread.JobScheduleHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; import com.xxl.job.admin.service.XxlJobService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; @@ -41,19 +43,19 @@ public class XxlJobServiceImpl implements XxlJobService { private XxlJobLogGlueDao xxlJobLogGlueDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; - + @Override public Map pageList(int start, int length, int jobGroup, int triggerStatus, String jobDesc, String executorHandler, String author) { // page list List list = xxlJobInfoDao.pageList(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author); int list_count = xxlJobInfoDao.pageListCount(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author); - + // package result Map maps = new HashMap(); - maps.put("recordsTotal", list_count); // 总记录数 - maps.put("recordsFiltered", list_count); // 过滤后的总记录数 - maps.put("data", list); // 分页列表 + maps.put("recordsTotal", list_count); // 总记录数 + maps.put("recordsFiltered", list_count); // 过滤后的总记录数 + maps.put("data", list); // 分页列表 return maps; } @@ -328,6 +330,7 @@ public class XxlJobServiceImpl implements XxlJobService { xxlJobInfo.setTriggerNextTime(nextTriggerTime); xxlJobInfo.setUpdateTime(new Date()); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo); xxlJobInfoDao.update(xxlJobInfo); return ReturnT.SUCCESS; } diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index c9ef4bfa..dd2b1d74 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://47.97.249.124:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://localhost/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=123456 +spring.datasource.password=12345678 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool @@ -63,3 +63,8 @@ xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30 + +xxl.job.cluster.host.name= +xxl.job.cluster.enable=true + + diff --git a/xxl-job-admin/src/main/resources/logback.xml b/xxl-job-admin/src/main/resources/logback.xml index d4b08c24..154b272b 100644 --- a/xxl-job-admin/src/main/resources/logback.xml +++ b/xxl-job-admin/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - + diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml new file mode 100644 index 00000000..5bbd8c63 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml @@ -0,0 +1,17 @@ + + + + + + replace into xxl_job_cluster values(#{ip}, now()); + + + delete from xxl_job_cluster where update_time <= #{time} + + + + + \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml index 7b3c3a3e..0f544bca 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml @@ -36,6 +36,8 @@ + + @@ -224,17 +226,53 @@ FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time #{maxNextTime} + + and t.host_name=#{hostName} + and t.lock_status=0 + ORDER BY id ASC LIMIT #{pagesize} + + UPDATE xxl_job_info SET trigger_last_time = #{triggerLastTime}, trigger_next_time = #{triggerNextTime}, - trigger_status = #{triggerStatus} + trigger_status = #{triggerStatus}, + lock_status=#{lockStatus} WHERE id = #{id} + + update xxl_job_info set lock_status = 1 + where id in + + #{id} + + + + update xxl_job_info set host_name = #{hostName} + where id in + + #{id} + + + + update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE) + set a.lock_status = 0 + where (b.host_name is null + + or a.host_name =#{hostName} + + ) and a.lock_status=1 + \ No newline at end of file diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java index 759d6625..a1000951 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java @@ -1,5 +1,6 @@ package com.xxl.job.executor.service.jobhandler; +import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; @@ -36,6 +37,7 @@ public class SampleXxlJob { */ @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { + logger.info(XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties index 66403471..bfd8eabb 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties @@ -1,5 +1,5 @@ # web port -server.port=8081 +server.port=8084 # no web #spring.main.web-environment=false @@ -8,7 +8,7 @@ logging.config=classpath:logback.xml ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin +xxl.job.admin.addresses=http://127.0.0.1:8081/xxl-job-admin ### xxl-job, access token xxl.job.accessToken= @@ -22,6 +22,6 @@ xxl.job.executor.address= xxl.job.executor.ip= xxl.job.executor.port=9999 ### xxl-job executor log-path -xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler +xxl.job.executor.logpath=../data/applogs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30 diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml index d5a0d2ca..89f526de 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - +