diff --git a/doc/db/tables_xxl_job.sql b/doc/db/tables_xxl_job.sql index 9002d076..ae61bd0a 100644 --- a/doc/db/tables_xxl_job.sql +++ b/doc/db/tables_xxl_job.sql @@ -32,7 +32,11 @@ CREATE TABLE `xxl_job_info` ( `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', - PRIMARY KEY (`id`) + `host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名', + `lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态:1-改任务锁定中 0-未锁定', + PRIMARY KEY (`id`), + KEY `idx_host_name` (`host_name`), + KEY `idx_lock` (`lock_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( @@ -113,10 +117,18 @@ CREATE TABLE `xxl_job_lock` ( PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE TABLE `xxl_job_cluster`( + `host_name` varchar(100) NOT NULL COMMENT '实例名', + `update_time` datetime DEFAULT NULL comment '更新时间', + PRIMARY KEY (`host_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' ); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); +INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock'); + commit; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java index 380b8a59..698bfad1 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java @@ -1,8 +1,13 @@ package com.xxl.job.admin.core.conf; import com.xxl.job.admin.core.alarm.JobAlarmer; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; +import com.xxl.job.admin.service.impl.AverageJobAllocation; +import com.xxl.job.core.util.IpUtil; +import io.micrometer.core.instrument.util.StringUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; @@ -36,8 +41,11 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { public void afterPropertiesSet() throws Exception { adminConfig = this; + getJobAllocation().init(true); xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); + getJobAllocation().flush(); + } @Override @@ -67,6 +75,16 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Value("${xxl.job.logretentiondays}") private int logretentiondays; + + @Value("${xxl.job.cluster.host.name}") + private String hostName; + + @Value("${server.port}") + private int port; + + @Value("${xxl.job.cluster.enable:false}") + private boolean clusterEnable; + // dao, service @Resource @@ -79,6 +97,9 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private XxlJobGroupDao xxlJobGroupDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; + + @Resource + private XxlJobClusterDao xxlJobClusterDao; @Resource private JavaMailSender mailSender; @Resource @@ -86,6 +107,23 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Resource private JobAlarmer jobAlarmer; + private JobAllocation jobAllocation = defaultJobAllocation; + + public String getHostName() { + return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName; + } + + + private static JobAllocation defaultJobAllocation = new JobAllocation() { + @Override + public void allocation(XxlJobInfo jobInfo) { + } + + @Override + public void init(boolean init) { + } + }; + public String getI18n() { if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) { @@ -155,4 +193,20 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { return jobAlarmer; } + public XxlJobClusterDao getXxlJobClusterDao() { + return xxlJobClusterDao; + } + + public JobAllocation getJobAllocation() { + return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation; + } + + public void setJobAllocation(JobAllocation jobAllocation) { + this.jobAllocation = jobAllocation; + } + + public boolean isClusterEnable() { + return clusterEnable; + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java new file mode 100644 index 00000000..4a7c2fb6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java @@ -0,0 +1,26 @@ +package com.xxl.job.admin.core.model; + +import java.util.Date; + +public class XxlJobCluster { + + private String hostName; + + private Date updateTime; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java index e47b6dc6..0af4f663 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java @@ -42,6 +42,10 @@ public class XxlJobInfo { private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 + private String hostName; // 运行实例名称 + + private int lockStatus; + public int getId() { return id; @@ -234,4 +238,20 @@ public class XxlJobInfo { public void setTriggerNextTime(long triggerNextTime) { this.triggerNextTime = triggerNextTime; } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public int getLockStatus() { + return lockStatus; + } + + public void setLockStatus(int lockStatus) { + this.lockStatus = lockStatus; + } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 37edfd98..2fd76761 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -2,10 +2,12 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; @@ -29,6 +31,13 @@ public class JobRegistryHelper { private Thread registryMonitorThread; private volatile boolean toStop = false; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "cluster host registry"); + } + }); + public void start(){ // for registry or remove @@ -126,6 +135,28 @@ public class JobRegistryHelper { registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName()); + + Date date = new Date(); + + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5)); + + XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo(); + if(jobInfo!=null){ + XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush(); + } + } catch (Exception e) { + logger.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 0, 1000 * 30, TimeUnit.MILLISECONDS); } public void toStop(){ diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 831bcf6a..daba04ff 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -15,6 +15,7 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author xuxueli 2019-05-21 @@ -77,8 +78,25 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); - List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); - if (scheduleList!=null && scheduleList.size()>0) { + List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount + , XxlJobAdminConfig.getAdminConfig().getHostName(), + XxlJobAdminConfig.getAdminConfig().isClusterEnable()); + if (scheduleList!=null && !scheduleList.isEmpty()) { + + if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream() + .map(XxlJobInfo::getId).collect(Collectors.toList())); + + try { + conn.commit(); + } catch (SQLException e) { + if (!scheduleThreadToStop) { + logger.error(e.getMessage(), e); + } + } + } + + // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { @@ -140,6 +158,7 @@ public class JobScheduleHelper { // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { + jobInfo.setLockStatus(0); XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java new file mode 100644 index 00000000..3d93c4e0 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java @@ -0,0 +1,20 @@ +package com.xxl.job.admin.dao; + +import com.xxl.job.admin.core.model.XxlJobCluster; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +/** + * @author liyong + */ +@Mapper +public interface XxlJobClusterDao { + void replace(@Param("ip") String ip); + + List findAll(); + + void delete(@Param("time") Date date); +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java index d640efff..8adaca5b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java @@ -41,9 +41,20 @@ public interface XxlJobInfoDao { public int findAllCount(); - public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize ); + public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize, + @Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable); public int scheduleUpdate(XxlJobInfo xxlJobInfo); + void updateStatusById(@Param("list") List collect); + + List pageById(@Param("id") Integer id); + + void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List v); + + void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init); + + XxlJobInfo findOldClusterInfo(); + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java new file mode 100644 index 00000000..1ad6dc2e --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java @@ -0,0 +1,25 @@ +package com.xxl.job.admin.service; + +import com.xxl.job.admin.core.model.XxlJobInfo; + +/** + * @author liyong + */ +public interface JobAllocation { + + /** + * 计算单个任务的分配 + * @param jobInfo + */ + default void allocation(XxlJobInfo jobInfo){} + + /** + * 用于节点起来的时候 重新计算任务的分配 + */ + default void flush(){} + + /** + * 启动初始化相关动作 + */ + default void init(boolean init){} +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java new file mode 100644 index 00000000..4f8c15d6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java @@ -0,0 +1,150 @@ +package com.xxl.job.admin.service.impl; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.service.JobAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * @author liyong + */ +public class AverageJobAllocation implements JobAllocation { + + private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class); + + + private static int oneMinute = 60; + private static int tenMinutes = oneMinute * 10; + private static int oneHour = tenMinutes * 6; + private static int oneDay = oneHour * 24; + + private Map averageMap = new ConcurrentHashMap<>(); + private Map> ipMap = new ConcurrentHashMap<>(); + + { + // 防止执行频率高的 扎堆在一起 + averageMap.put(oneMinute, new AtomicInteger(0)); + averageMap.put(tenMinutes, new AtomicInteger(0)); + averageMap.put(oneHour, new AtomicInteger(0)); + averageMap.put(oneDay, new AtomicInteger(0)); + averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0)); + } + + @Override + public void allocation(XxlJobInfo jobInfo) { + List all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll(); + List ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000) + .map(XxlJobCluster::getHostName).collect(Collectors.toList()); + + if (ipList.isEmpty()) { + jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName()); + return; + } + + Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis()); + int i = averageMap.get(key).incrementAndGet(); + + jobInfo.setHostName(ipList.get(i % ipList.size())); + + } + + @Override + public void init(boolean init) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init); + } + + @Override + public void flush() { + Connection conn = null; + Boolean connAutoCommit = null; + PreparedStatement preparedStatement = null; + + try { + conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); + connAutoCommit = conn.getAutoCommit(); + conn.setAutoCommit(false); + + preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update"); + preparedStatement.execute(); + + recursion(0, conn); + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job, cluster flush error:{}", e); + } finally { + // commit + if (conn != null) { + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.setAutoCommit(connAutoCommit); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + // close PreparedStatement + if (null != preparedStatement) { + try { + preparedStatement.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + } + + private void recursion(int id, Connection conn) { + List list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id); + if (CollectionUtils.isEmpty(list)) + return; + Map> map = new HashMap<>(); + list.forEach(x -> { + allocation(x); + List ids = map.getOrDefault(x.getHostName(), new ArrayList<>()); + ids.add(x.getId()); + map.put(x.getHostName(), ids); + }); + map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v)); + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + recursion(list.get(list.size() - 1).getId(), conn); + } + + private Integer getKey(long time) { + if (time < oneMinute) { + return oneMinute; + } else if (time < tenMinutes) { + return tenMinutes; + } else if (time < oneHour) { + return oneHour; + } else if (time < oneDay) { + return oneDay; + } else { + return Integer.MAX_VALUE; + } + } + +} + diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 530ee41c..1f5cae91 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.service.impl; +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; @@ -328,6 +329,7 @@ public class XxlJobServiceImpl implements XxlJobService { xxlJobInfo.setTriggerNextTime(nextTriggerTime); xxlJobInfo.setUpdateTime(new Date()); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo); xxlJobInfoDao.update(xxlJobInfo); return ReturnT.SUCCESS; } diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index 8727b6c7..810e4f40 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -63,3 +63,7 @@ xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30 + +### xxl-job cluster +xxl.job.cluster.host.name= +xxl.job.cluster.enable=true diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml new file mode 100644 index 00000000..5bbd8c63 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml @@ -0,0 +1,17 @@ + + + + + + replace into xxl_job_cluster values(#{ip}, now()); + + + delete from xxl_job_cluster where update_time <= #{time} + + + + + \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml index 7b3c3a3e..764282f2 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml @@ -36,6 +36,9 @@ + + + @@ -224,6 +227,10 @@ FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time #{maxNextTime} + + and t.host_name=#{hostName} + and t.lock_status=0 + ORDER BY id ASC LIMIT #{pagesize} @@ -233,8 +240,40 @@ SET trigger_last_time = #{triggerLastTime}, trigger_next_time = #{triggerNextTime}, - trigger_status = #{triggerStatus} + trigger_status = #{triggerStatus}, + lock_status=#{lockStatus} WHERE id = #{id} + + + + update xxl_job_info set lock_status = 1 + where id in + + #{id} + + + + update xxl_job_info set host_name = #{hostName} + where id in + + #{id} + + + + update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE) + set a.lock_status = 0 + where (b.host_name is null + + or a.host_name =#{hostName} + + ) and a.lock_status=1 + \ No newline at end of file diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java index 759d6625..865543ab 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java @@ -36,6 +36,7 @@ public class SampleXxlJob { */ @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { + logger.info(XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) {