diff --git a/doc/db/tables_xxl_job.sql b/doc/db/tables_xxl_job.sql index 9002d076..61851114 100644 --- a/doc/db/tables_xxl_job.sql +++ b/doc/db/tables_xxl_job.sql @@ -1,5 +1,5 @@ # -# XXL-JOB v2.4.1-SNAPSHOT +# XXL-JOB v2.3.1-SNAPSHOT # Copyright (c) 2015-present, xuxueli. CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci; @@ -29,10 +29,14 @@ CREATE TABLE `xxl_job_info` ( `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注', `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间', `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔', - `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行', + `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行 2.预备执行中', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间', - PRIMARY KEY (`id`) + `host_name` varchar(100) NOT NULL DEFAULT '' COMMENT '实例名', + `lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态:1-改任务锁定中 0-未锁定', + PRIMARY KEY (`id`), + KEY `idx_host_name` (`host_name`), + KEY `idx_lock` (`lock_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( @@ -113,10 +117,17 @@ CREATE TABLE `xxl_job_lock` ( PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE TABLE `xxl_job_cluster` ( + `host_name` varchar(100) NOT NULL COMMENT '实例名', + `update_time` datetime DEFAULT NULL comment '更新时间', + PRIMARY KEY (`host_name`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`, `update_time`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL, '2018-11-03 22:21:31' ); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `schedule_type`, `schedule_conf`, `misfire_strategy`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'CRON', '0 0 0 * * ? *', 'DO_NOTHING', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); +INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'flush_lock'); commit; diff --git a/doc/note b/doc/note new file mode 100644 index 00000000..bd7cec84 --- /dev/null +++ b/doc/note @@ -0,0 +1,4 @@ +源码仓库地址 +https://github.com/xuxueli/xxl-job/releases +http://gitee.com/xuxueli0323/xxl-job/releases + diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java index 279ad7d1..cd7557d5 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/complete/XxlJobCompleter.java @@ -28,6 +28,7 @@ public class XxlJobCompleter { public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) { // finish + //执行字任务 finishJob(xxlJobLog); // text最大64kb 避免长度过长 @@ -36,6 +37,7 @@ public class XxlJobCompleter { } // fresh handle + //更新log表 return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java index 380b8a59..55a25146 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/conf/XxlJobAdminConfig.java @@ -1,8 +1,15 @@ package com.xxl.job.admin.core.conf; import com.xxl.job.admin.core.alarm.JobAlarmer; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.XxlJobScheduler; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; +import com.xxl.job.admin.service.impl.AverageJobAllocation; +import com.xxl.job.core.util.IpUtil; +import io.micrometer.core.instrument.util.StringUtils; +import io.netty.util.internal.StringUtil; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; @@ -12,6 +19,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Arrays; +import java.util.List; /** * xxl-job config @@ -23,6 +31,7 @@ import java.util.Arrays; public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private static XxlJobAdminConfig adminConfig = null; + public static XxlJobAdminConfig getAdminConfig() { return adminConfig; } @@ -35,9 +44,10 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Override public void afterPropertiesSet() throws Exception { adminConfig = this; - + getJobAllocation().init(true); xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); + getJobAllocation().flush(); } @Override @@ -67,6 +77,17 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Value("${xxl.job.logretentiondays}") private int logretentiondays; + + @Value("${xxl.job.cluster.host.name}") + private String hostName; + + @Value("${server.port}") + private int port; + + @Value("${xxl.job.cluster.enable:false}") + private boolean clusterEnable; + + // dao, service @Resource @@ -80,12 +101,28 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { @Resource private XxlJobLogReportDao xxlJobLogReportDao; @Resource + private XxlJobClusterDao xxlJobClusterDao; + @Resource private JavaMailSender mailSender; @Resource private DataSource dataSource; @Resource private JobAlarmer jobAlarmer; + public String getHostName() { + return StringUtils.isBlank(hostName) ? IpUtil.getIpPort(port) : hostName; + } + + private JobAllocation jobAllocation = defaultJobAllocation; + private static JobAllocation defaultJobAllocation = new JobAllocation() { + @Override + public void allocation(XxlJobInfo jobInfo) { + } + + @Override + public void init(boolean init) { + } + }; public String getI18n() { if (!Arrays.asList("zh_CN", "zh_TC", "en").contains(i18n)) { @@ -155,4 +192,19 @@ public class XxlJobAdminConfig implements InitializingBean, DisposableBean { return jobAlarmer; } + public XxlJobClusterDao getXxlJobClusterDao() { + return xxlJobClusterDao; + } + + public JobAllocation getJobAllocation() { + return jobAllocation.equals(defaultJobAllocation) && clusterEnable ? new AverageJobAllocation() : defaultJobAllocation; + } + + public void setJobAllocation(JobAllocation jobAllocation) { + this.jobAllocation = jobAllocation; + } + + public boolean isClusterEnable() { + return clusterEnable; + } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java new file mode 100644 index 00000000..4a7c2fb6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobCluster.java @@ -0,0 +1,26 @@ +package com.xxl.job.admin.core.model; + +import java.util.Date; + +public class XxlJobCluster { + + private String hostName; + + private Date updateTime; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java index e47b6dc6..5b38fb45 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/model/XxlJobInfo.java @@ -41,7 +41,9 @@ public class XxlJobInfo { private int triggerStatus; // 调度状态:0-停止,1-运行 private long triggerLastTime; // 上次调度时间 private long triggerNextTime; // 下次调度时间 + private String hostName; // 运行实例名称 + private int lockStatus; public int getId() { return id; @@ -234,4 +236,21 @@ public class XxlJobInfo { public void setTriggerNextTime(long triggerNextTime) { this.triggerNextTime = triggerNextTime; } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public int getLockStatus() { + return lockStatus; + } + + public void setLockStatus(int lockStatus) { + this.lockStatus = lockStatus; + } + } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java index 868560fc..0b8bd355 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java @@ -12,6 +12,7 @@ import java.util.List; /** * Created by xuxueli on 17/3/10. + * 通过调用 客户端 idleBeat 接口 */ public class ExecutorRouteBusyover extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java index a2e4c909..ea1e4ade 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java @@ -11,6 +11,7 @@ import java.util.List; /** * Created by xuxueli on 17/3/10. + * 通过调用 客户端 beat接口 返回第一个SUCCESS的 */ public class ExecutorRouteFailover extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java index 9df19726..5151a44c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java @@ -13,6 +13,7 @@ import java.util.concurrent.ConcurrentMap; * a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数 * b、LRU(Least Recently Used):最近最久未使用,时间 * + * 统计24小时内调用次数 最少的 每次调用通过map+1记录 如果是第一次则是随机生成调用次数 这个策略主要是当前节点调用次数 不是 被调方的 调用次数 如果是多节点不符合预期结果 * Created by xuxueli on 17/3/10. */ public class ExecutorRouteLFU extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java index 2d540067..ee3d61bb 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java @@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap; * a、LFU(Least Frequently Used):最不经常使用,频率/次数 * b(*)、LRU(Least Recently Used):最近最久未使用,时间 * + * 基于 LinkedHashMap (accessorder=true)的访问顺序 来控制 * Created by xuxueli on 17/3/10. */ public class ExecutorRouteLRU extends ExecutorRouter { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java index bb2cda8b..8d970c45 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/scheduler/XxlJobScheduler.java @@ -22,24 +22,33 @@ public class XxlJobScheduler { public void init() throws Exception { // init i18n + // TODO 国际化阻塞策略 initI18n(); // admin trigger pool start + // TODO 初始化 fastTriggerPool(用于 执行任务(步骤四、步骤7) 、slowTriggerPool(用于 执行慢任务(步骤四、步骤7)) 线程池 + //TODO 注意最大线程数 后面会用于 计算一次循环从数据库拉取的任务数 JobTriggerPoolHelper.toStart(); // admin registry monitor run + //TODO 1.初始化registryOrRemoveThreadPool(用于 客户端注册) 2.开启 registryMonitorThread (用于扫描 有没有对应执行器的 客户端地址 每30秒扫描一次 JobRegistryHelper.getInstance().start(); // admin fail-monitor run + //TODO 步骤4 开启线程 扫描异常任务 重试机制 JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) + //TODO 初始化 callbackThreadPool(用于接收客户端执行状态) 线程池 启动 monitorThread 处理长期处于进行中的任务 JobCompleteHelper.getInstance().start(); // admin log report start + //todo 开启 logrThread 统计报表相关 间隔一分钟 + //TODO 每次统计都是统计一天的 没做大数据量的限制 过大时会影响效率 可以做成增量的 在获取结果时异步 统计 或者每次只统计1分钟的 JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) + //TODO 步骤7 任务的预处理 scheduleThread ringThread JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java index 5698926a..c3ce7f1a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobCompleteHelper.java @@ -74,7 +74,7 @@ public class JobCompleteHelper { // monitor while (!toStop) { try { - // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; + //TODO 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败; Date losedTime = DateUtil.addMinutes(new Date(), -10); List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java index 8409d7b3..0a2a6900 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobFailMonitorHelper.java @@ -43,30 +43,39 @@ public class JobFailMonitorHelper { for (long failLogId: failLogIds) { // lock log + //这里通过 依赖数据库事务 来进行避免 多节点时数据的重复操作问题 + //TODO 如果在这一步服务挂了 这批数据就会被下一个线程扫描为失败 int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1); if (lockRet < 1) { continue; } + //查询执行记录及对应的任务信息 XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); // 1、fail retry monitor + //TODO 校验重试次数 if (log.getExecutorFailRetryCount() > 0) { + //执行任务 JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null); String retryMsg = "

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<<
"; log.setTriggerMsg(log.getTriggerMsg() + retryMsg); + //更新记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); } // 2、fail alarm monitor int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败 if (info != null) { + //发送告警 不看 boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log); newAlarmStatus = alarmResult?2:3; } else { newAlarmStatus = 1; } + + //更新告警状态 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java index 37edfd98..44c2fbca 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java @@ -1,17 +1,23 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; import com.xxl.job.admin.core.model.XxlJobGroup; +import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.DateUtil; +import com.xxl.job.core.util.IpUtil; +import org.apache.catalina.startup.HostConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; /** * job registry instance @@ -26,9 +32,18 @@ public class JobRegistryHelper { } private ThreadPoolExecutor registryOrRemoveThreadPool = null; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "cluster host registry"); + } + }); + private Thread registryMonitorThread; private volatile boolean toStop = false; + + public void start(){ // for registry or remove @@ -59,10 +74,12 @@ public class JobRegistryHelper { while (!toStop) { try { // auto registry group + // List groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) + //TODO 查询有没有90秒之前注册过的节点 有则直接删除 List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); @@ -70,9 +87,11 @@ public class JobRegistryHelper { // fresh online address (admin/executor) HashMap> appAddressMap = new HashMap>(); + //TODO 查询 90秒内注册过的 List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { + //TODO if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List registryList = appAddressMap.get(appname); @@ -103,7 +122,7 @@ public class JobRegistryHelper { } group.setAddressList(addressListStr); group.setUpdateTime(new Date()); - + //TODO 循环更新 执行器 对应的 注册列表 XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } @@ -113,6 +132,7 @@ public class JobRegistryHelper { } } try { + //TODO 间隔30秒 TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { @@ -126,6 +146,29 @@ public class JobRegistryHelper { registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); + + + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + try { + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().replace(XxlJobAdminConfig.getAdminConfig().getHostName()); + + Date date = new Date(); + + XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().delete(DateUtil.addMinutes(date,-5)); + + XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().findOldClusterInfo(); + if(jobInfo!=null){ + XxlJobAdminConfig.getAdminConfig().getJobAllocation().init(false); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().flush(); + } + } catch (Exception e) { + logger.error("ScheduledTask fetchNameServerAddr exception", e); + } + } + }, 0, 1000 * 30, TimeUnit.MILLISECONDS); } public void toStop(){ diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java index 831bcf6a..35baee67 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobScheduleHelper.java @@ -6,6 +6,7 @@ import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum; import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.trigger.TriggerTypeEnum; +import com.xxl.job.core.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +16,7 @@ import java.sql.SQLException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * @author xuxueli 2019-05-21 @@ -70,6 +72,7 @@ public class JobScheduleHelper { connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); + //TODO 通过这个来控制多节点只有一个节点会执行 low的一笔 preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); @@ -77,18 +80,37 @@ public class JobScheduleHelper { // 1、pre read long nowTime = System.currentTimeMillis(); - List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); + //TODO 下次执行时间 在5秒内的 整个框架的核心 + List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao() + .scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount, XxlJobAdminConfig.getAdminConfig().getHostName(), + XxlJobAdminConfig.getAdminConfig().isClusterEnable()); if (scheduleList!=null && scheduleList.size()>0) { + + if(XxlJobAdminConfig.getAdminConfig().isClusterEnable()) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateStatusById(scheduleList.stream() + .map(XxlJobInfo::getId).collect(Collectors.toList())); + + try { + conn.commit(); + } catch (SQLException e) { + if (!scheduleThreadToStop) { + logger.error(e.getMessage(), e); + } + } + } + scheduleList.forEach(x->logger.info(x.getId()+"")); // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // time-ring jump + // TODO 当前时间>下次执行时间+5秒 任务触发过期策略 5秒内不算过期 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); + //如果过期策略是立即执行一次则 执行 if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); @@ -96,25 +118,33 @@ public class JobScheduleHelper { } // 2、fresh next + //刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); - } else if (nowTime > jobInfo.getTriggerNextTime()) { + } + //TODO 当前时间>下次执行时间 过期了但是控制在5秒内不算过期 + else if (nowTime > jobInfo.getTriggerNextTime()) { // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time // 1、trigger + //TODO 执行 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、fresh next + //TODO 刷新下次执行时间 refreshNextValidTime(jobInfo, new Date()); // next-trigger-time in 5s, pre-read again + //TODO 如果任务在运行状态 且 下次执行时间在5秒内 则将 任务 缓存到 ringData if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second + //TODO 获取下次执行时间 是哪一秒 作为缓存的key int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring + //TODO 缓存至 ringData pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next @@ -122,7 +152,9 @@ public class JobScheduleHelper { } - } else { + } + //还没到下次执行时间的 缓存到 ringData + else { // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time // 1、make ring second @@ -140,6 +172,7 @@ public class JobScheduleHelper { // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { + jobInfo.setLockStatus(0); XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } @@ -199,6 +232,7 @@ public class JobScheduleHelper { if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; + //随机休眠 没任务则休眠5秒内 有任务则休眠1秒内 TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { @@ -236,7 +270,8 @@ public class JobScheduleHelper { try { // second data List ringItemData = new ArrayList<>(); - int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度; + //TODO 避免处理耗时太长,跨过刻度,向前校验一个刻度; + int nowSecond = Calendar.getInstance().get(Calendar.SECOND); for (int i = 0; i < 2; i++) { List tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { @@ -249,7 +284,7 @@ public class JobScheduleHelper { if (ringItemData.size() > 0) { // do trigger for (int jobId: ringItemData) { - // do trigger + //TODO do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // clear @@ -355,6 +390,14 @@ public class JobScheduleHelper { // ---------------------- tools ---------------------- + + /** + * 获取下次执行时间 + * @param jobInfo + * @param fromTime + * @return + * @throws Exception + */ public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception { ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null); if (ScheduleTypeEnum.CRON == scheduleTypeEnum) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index 398713dd..f27b1bd5 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -30,7 +30,7 @@ public class JobTriggerPoolHelper { XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(1000), + new LinkedBlockingQueue(10000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -43,7 +43,7 @@ public class JobTriggerPoolHelper { XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue(2000), + new LinkedBlockingQueue(20000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -79,6 +79,7 @@ public class JobTriggerPoolHelper { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); + //慢任务 超过 10次 则调用单独的线程池 if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } @@ -97,6 +98,7 @@ public class JobTriggerPoolHelper { logger.error(e.getMessage(), e); } finally { + //TODO 统计1分钟内 调用超过500ms的 // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 748befc6..868a2e76 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -49,6 +49,7 @@ public class XxlJobTrigger { String addressList) { // load data + //###############初始化 任务参数及执行器参数 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); @@ -76,6 +77,9 @@ public class XxlJobTrigger { shardingParam[1] = Integer.valueOf(shardingArr[1]); } } + //############################ + + //TODO 路由策略如果是 分片广播 则循环调用 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { @@ -86,6 +90,7 @@ public class XxlJobTrigger { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } + //TODO processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } @@ -111,7 +116,9 @@ public class XxlJobTrigger { private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param + //TODO 获取阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy + //TODO 获取路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; @@ -142,6 +149,7 @@ public class XxlJobTrigger { String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { + //分片广播 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); @@ -149,6 +157,7 @@ public class XxlJobTrigger { address = group.getRegistryList().get(0); } } else { + //TODO 基于不同的路由策略获取地址 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); @@ -161,6 +170,7 @@ public class XxlJobTrigger { // 4、trigger remote executor ReturnT triggerResult = null; if (address != null) { + //TODO 执行任务 调用 客户端 run 接口 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); @@ -193,6 +203,7 @@ public class XxlJobTrigger { //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); + //TODO 更新调用记录 XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java new file mode 100644 index 00000000..8fb5a14a --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobClusterDao.java @@ -0,0 +1,17 @@ +package com.xxl.job.admin.dao; + +import com.xxl.job.admin.core.model.XxlJobCluster; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; +import java.util.List; + +@Mapper +public interface XxlJobClusterDao { + void replace(@Param("ip") String ip); + + List findAll(); + + void delete(@Param("time") Date date); +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java index d640efff..45564ecc 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobInfoDao.java @@ -9,41 +9,53 @@ import java.util.List; /** * job info + * * @author xuxueli 2016-1-12 18:03:45 */ @Mapper public interface XxlJobInfoDao { - public List pageList(@Param("offset") int offset, - @Param("pagesize") int pagesize, - @Param("jobGroup") int jobGroup, - @Param("triggerStatus") int triggerStatus, - @Param("jobDesc") String jobDesc, - @Param("executorHandler") String executorHandler, - @Param("author") String author); - public int pageListCount(@Param("offset") int offset, - @Param("pagesize") int pagesize, - @Param("jobGroup") int jobGroup, - @Param("triggerStatus") int triggerStatus, - @Param("jobDesc") String jobDesc, - @Param("executorHandler") String executorHandler, - @Param("author") String author); - - public int save(XxlJobInfo info); + public List pageList(@Param("offset") int offset, + @Param("pagesize") int pagesize, + @Param("jobGroup") int jobGroup, + @Param("triggerStatus") int triggerStatus, + @Param("jobDesc") String jobDesc, + @Param("executorHandler") String executorHandler, + @Param("author") String author); - public XxlJobInfo loadById(@Param("id") int id); - - public int update(XxlJobInfo xxlJobInfo); - - public int delete(@Param("id") long id); + public int pageListCount(@Param("offset") int offset, + @Param("pagesize") int pagesize, + @Param("jobGroup") int jobGroup, + @Param("triggerStatus") int triggerStatus, + @Param("jobDesc") String jobDesc, + @Param("executorHandler") String executorHandler, + @Param("author") String author); - public List getJobsByGroup(@Param("jobGroup") int jobGroup); + public int save(XxlJobInfo info); - public int findAllCount(); + public XxlJobInfo loadById(@Param("id") int id); - public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize ); + public int update(XxlJobInfo xxlJobInfo); - public int scheduleUpdate(XxlJobInfo xxlJobInfo); + public int delete(@Param("id") long id); + public List getJobsByGroup(@Param("jobGroup") int jobGroup); + public int findAllCount(); + + public List scheduleJobQuery(@Param("maxNextTime") long maxNextTime, @Param("pagesize") int pagesize, + @Param("hostName") String ip,@Param("clusterEnable") boolean clusterEnable); + + public int scheduleUpdate(XxlJobInfo xxlJobInfo); + + + void updateStatusById(@Param("list") List collect); + + List pageById(@Param("id") Integer id); + + void updateHostNameByIds(@Param("hostName") String k, @Param("ids") List v); + + void initLockStatus(@Param("hostName") String hostName,@Param("init") boolean init); + + XxlJobInfo findOldClusterInfo(); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java new file mode 100644 index 00000000..245d1881 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/JobAllocation.java @@ -0,0 +1,22 @@ +package com.xxl.job.admin.service; + +import com.xxl.job.admin.core.model.XxlJobInfo; + +public interface JobAllocation { + + /** + * 计算单个任务的分配 + * @param jobInfo + */ + default void allocation(XxlJobInfo jobInfo){} + + /** + * 用于节点起来的时候 重新计算任务的分配 + */ + default void flush(){} + + /** + * 启动初始化相关动作 + */ + default void init(boolean init){} +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java new file mode 100644 index 00000000..4f8c15d6 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/AverageJobAllocation.java @@ -0,0 +1,150 @@ +package com.xxl.job.admin.service.impl; + +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; +import com.xxl.job.admin.core.model.XxlJobCluster; +import com.xxl.job.admin.core.model.XxlJobInfo; +import com.xxl.job.admin.service.JobAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** + * @author liyong + */ +public class AverageJobAllocation implements JobAllocation { + + private static Logger logger = LoggerFactory.getLogger(AverageJobAllocation.class); + + + private static int oneMinute = 60; + private static int tenMinutes = oneMinute * 10; + private static int oneHour = tenMinutes * 6; + private static int oneDay = oneHour * 24; + + private Map averageMap = new ConcurrentHashMap<>(); + private Map> ipMap = new ConcurrentHashMap<>(); + + { + // 防止执行频率高的 扎堆在一起 + averageMap.put(oneMinute, new AtomicInteger(0)); + averageMap.put(tenMinutes, new AtomicInteger(0)); + averageMap.put(oneHour, new AtomicInteger(0)); + averageMap.put(oneDay, new AtomicInteger(0)); + averageMap.put(Integer.MAX_VALUE, new AtomicInteger(0)); + } + + @Override + public void allocation(XxlJobInfo jobInfo) { + List all = XxlJobAdminConfig.getAdminConfig().getXxlJobClusterDao().findAll(); + List ipList = all.stream().filter(x -> x.getUpdateTime().getTime() > System.currentTimeMillis() - 60 * 1000) + .map(XxlJobCluster::getHostName).collect(Collectors.toList()); + + if (ipList.isEmpty()) { + jobInfo.setHostName(XxlJobAdminConfig.getAdminConfig().getHostName()); + return; + } + + Integer key = getKey(jobInfo.getTriggerNextTime() - System.currentTimeMillis()); + int i = averageMap.get(key).incrementAndGet(); + + jobInfo.setHostName(ipList.get(i % ipList.size())); + + } + + @Override + public void init(boolean init) { + XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().initLockStatus(XxlJobAdminConfig.getAdminConfig().getHostName(),init); + } + + @Override + public void flush() { + Connection conn = null; + Boolean connAutoCommit = null; + PreparedStatement preparedStatement = null; + + try { + conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); + connAutoCommit = conn.getAutoCommit(); + conn.setAutoCommit(false); + + preparedStatement = conn.prepareStatement("select * from xxl_job_lock where lock_name = 'flush_lock' for update"); + preparedStatement.execute(); + + recursion(0, conn); + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job, cluster flush error:{}", e); + } finally { + // commit + if (conn != null) { + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.setAutoCommit(connAutoCommit); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + try { + conn.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + + // close PreparedStatement + if (null != preparedStatement) { + try { + preparedStatement.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + } + + private void recursion(int id, Connection conn) { + List list = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().pageById(id); + if (CollectionUtils.isEmpty(list)) + return; + Map> map = new HashMap<>(); + list.forEach(x -> { + allocation(x); + List ids = map.getOrDefault(x.getHostName(), new ArrayList<>()); + ids.add(x.getId()); + map.put(x.getHostName(), ids); + }); + map.forEach((k, v) -> XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().updateHostNameByIds(k, v)); + try { + conn.commit(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + recursion(list.get(list.size() - 1).getId(), conn); + } + + private Integer getKey(long time) { + if (time < oneMinute) { + return oneMinute; + } else if (time < tenMinutes) { + return tenMinutes; + } else if (time < oneHour) { + return oneHour; + } else if (time < oneDay) { + return oneDay; + } else { + return Integer.MAX_VALUE; + } + } + +} + diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 530ee41c..09022c5a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -1,5 +1,6 @@ package com.xxl.job.admin.service.impl; +import com.xxl.job.admin.core.conf.XxlJobAdminConfig; import com.xxl.job.admin.core.cron.CronExpression; import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; @@ -10,6 +11,7 @@ import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum; import com.xxl.job.admin.core.thread.JobScheduleHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.service.JobAllocation; import com.xxl.job.admin.service.XxlJobService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; @@ -41,19 +43,19 @@ public class XxlJobServiceImpl implements XxlJobService { private XxlJobLogGlueDao xxlJobLogGlueDao; @Resource private XxlJobLogReportDao xxlJobLogReportDao; - + @Override public Map pageList(int start, int length, int jobGroup, int triggerStatus, String jobDesc, String executorHandler, String author) { // page list List list = xxlJobInfoDao.pageList(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author); int list_count = xxlJobInfoDao.pageListCount(start, length, jobGroup, triggerStatus, jobDesc, executorHandler, author); - + // package result Map maps = new HashMap(); - maps.put("recordsTotal", list_count); // 总记录数 - maps.put("recordsFiltered", list_count); // 过滤后的总记录数 - maps.put("data", list); // 分页列表 + maps.put("recordsTotal", list_count); // 总记录数 + maps.put("recordsFiltered", list_count); // 过滤后的总记录数 + maps.put("data", list); // 分页列表 return maps; } @@ -328,6 +330,7 @@ public class XxlJobServiceImpl implements XxlJobService { xxlJobInfo.setTriggerNextTime(nextTriggerTime); xxlJobInfo.setUpdateTime(new Date()); + XxlJobAdminConfig.getAdminConfig().getJobAllocation().allocation(xxlJobInfo); xxlJobInfoDao.update(xxlJobInfo); return ReturnT.SUCCESS; } diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index 8727b6c7..dd2b1d74 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -23,9 +23,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://localhost/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=root_pwd +spring.datasource.password=12345678 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool @@ -52,7 +52,7 @@ spring.mail.properties.mail.smtp.starttls.required=true spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### xxl-job, access token -xxl.job.accessToken=default_token +xxl.job.accessToken= ### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en") xxl.job.i18n=zh_CN @@ -63,3 +63,8 @@ xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30 + +xxl.job.cluster.host.name= +xxl.job.cluster.enable=true + + diff --git a/xxl-job-admin/src/main/resources/logback.xml b/xxl-job-admin/src/main/resources/logback.xml index d4b08c24..154b272b 100644 --- a/xxl-job-admin/src/main/resources/logback.xml +++ b/xxl-job-admin/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - + diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml new file mode 100644 index 00000000..5bbd8c63 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobClusterMapper.xml @@ -0,0 +1,17 @@ + + + + + + replace into xxl_job_cluster values(#{ip}, now()); + + + delete from xxl_job_cluster where update_time <= #{time} + + + + + \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml index 7b3c3a3e..0f544bca 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobInfoMapper.xml @@ -36,6 +36,8 @@ + + @@ -224,17 +226,53 @@ FROM xxl_job_info AS t WHERE t.trigger_status = 1 and t.trigger_next_time #{maxNextTime} + + and t.host_name=#{hostName} + and t.lock_status=0 + ORDER BY id ASC LIMIT #{pagesize} + + UPDATE xxl_job_info SET trigger_last_time = #{triggerLastTime}, trigger_next_time = #{triggerNextTime}, - trigger_status = #{triggerStatus} + trigger_status = #{triggerStatus}, + lock_status=#{lockStatus} WHERE id = #{id} + + update xxl_job_info set lock_status = 1 + where id in + + #{id} + + + + update xxl_job_info set host_name = #{hostName} + where id in + + #{id} + + + + update xxl_job_info a left join xxl_job_cluster b on a.host_name =b.host_name and b.update_time>=DATE_SUB(now(),INTERVAL 1 MINUTE) + set a.lock_status = 0 + where (b.host_name is null + + or a.host_name =#{hostName} + + ) and a.lock_status=1 + \ No newline at end of file diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java index 9f594309..dbbcffa8 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java @@ -6,7 +6,7 @@ import com.xxl.job.core.util.XxlJobRemotingUtil; /** * admin api test - * + * 用于调用服务端接口 * @author xuxueli 2017-07-28 22:14:52 */ public class ExecutorBizClient implements ExecutorBiz { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 8bdf7093..6990e651 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import java.util.Date; -/** +/** 用于处理接收请求 * Created by xuxueli on 17/3/1. */ public class ExecutorBizImpl implements ExecutorBiz { @@ -46,15 +46,18 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public ReturnT run(TriggerParam triggerParam) { // load old:jobHandler + jobThread + //TODO 获取有没有正在执行的线程 JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); + //TODO 最常用的 bean 模式 if (GlueTypeEnum.BEAN == glueTypeEnum) { - // new jobhandler + // new jobhandler 通过解析对象时包装的map获取 + //TODO 通过一开始解析的xxljob注解 获取对应的对象 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread @@ -74,7 +77,9 @@ public class ExecutorBizImpl implements ExecutorBiz { } } - } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { + } + //groovy 脚本 忽略 + else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && @@ -97,7 +102,9 @@ public class ExecutorBizImpl implements ExecutorBiz { return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); } } - } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { + } + //其他脚本模式 忽略 + else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && @@ -120,13 +127,16 @@ public class ExecutorBizImpl implements ExecutorBiz { // executor block strategy if (jobThread != null) { + //TODO 阻塞处理策略 注意点 这边阻塞策略是在客户端 实现的 如果路由策略 每次执行的节点不一样 就会导致重复执行 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + //丢弃后续调用 // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + //覆盖之前调用 // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); @@ -134,16 +144,19 @@ public class ExecutorBizImpl implements ExecutorBiz { jobThread = null; } } else { + //单机串行 // just queue trigger } } // replace thread (new or exists invalid) if (jobThread == null) { + //TODO 开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue + //TODO 讲当前任务添加进队列 上一步开启的线程 会扫描这个队列 ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 4719b7b7..e3fb55c5 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -67,20 +67,22 @@ public class XxlJobExecutor { // ---------------------- start + stop ---------------------- public void start() throws Exception { - // init logpath + //TODO 初始化日志路径 XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client + //TODO 初始化服务器地址 initAdminBizList(adminAddresses, accessToken); - // init JobLogFileCleanThread + //TODO init JobLogFileCleanThread 开启一个线程清理日志 JobLogFileCleanThread.getInstance().start(logRetentionDays); - // init TriggerCallbackThread + //TODO init TriggerCallbackThread 调用服务端 /api/callback 反馈任务执行结果 TriggerCallbackThread.getInstance().start(); // init executor-server + //TODO 初始化XXLJOB服务 initEmbedServer(address, ip, port, appname, accessToken); } @@ -117,6 +119,13 @@ public class XxlJobExecutor { // ---------------------- admin-client (rpc invoker) ---------------------- private static List adminBizList; + + /** + * 初始化xxl 服务地址 解析包装成 AdminBizClient 存到 adminBizList内 (地址支持多个以,隔开) + * @param adminAddresses + * @param accessToken + * @throws Exception + */ private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { @@ -147,6 +156,7 @@ public class XxlJobExecutor { ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address + //TODO 优先使用address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); @@ -159,6 +169,7 @@ public class XxlJobExecutor { // start embedServer = new EmbedServer(); + //TODO embedServer.start(address, port, appname, accessToken); } @@ -175,6 +186,7 @@ public class XxlJobExecutor { // ---------------------- job handler repository ---------------------- + //存储业务对象 key为 xxljob.value private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap(); public static IJobHandler loadJobHandler(String name){ return jobHandlerRepository.get(name); @@ -208,7 +220,7 @@ public class XxlJobExecutor { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT execute(String param) \" ."); }*/ - + //TODO 设置访问权限 说明私有的方法也是支持的 executeMethod.setAccessible(true); // init and destroy @@ -217,6 +229,7 @@ public class XxlJobExecutor { if (xxlJob.init().trim().length() > 0) { try { + //获取初始化方法 initMethod = clazz.getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { @@ -241,6 +254,7 @@ public class XxlJobExecutor { // ---------------------- job thread repository ---------------------- private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ + //TODO 处理业务 JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java index 53efbb95..33ecf5d3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSimpleExecutor.java @@ -14,6 +14,7 @@ import java.util.Map; /** * xxl-job executor (for frameless) + * 非spring项目 使用 * * @author xuxueli 2020-11-05 */ diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java index 953903c3..d3c9062c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/impl/XxlJobSpringExecutor.java @@ -28,6 +28,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC // start + + /** + * 所有单例初始化以后调用 + */ @Override public void afterSingletonsInstantiated() { @@ -35,13 +39,15 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC /*initJobHandlerRepository(applicationContext);*/ // init JobHandler Repository (for method) + //初始化xxljob注解对象 initJobHandlerMethodRepository(applicationContext); - // refresh GlueFactory + //初始化 glueFactory 用于执行groovy脚本 GlueFactory.refreshInstance(1); // super start try { + //TODO super.start(); } catch (Exception e) { throw new RuntimeException(e); @@ -77,6 +83,10 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC } }*/ + /** + * 扫描有XxlJob 注解的方法 保存到 XxlJobExecutor.jobHandlerRepository(map) + * @param applicationContext + */ private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java index ff0585b5..ee1054a4 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/log/XxlJobFileAppender.java @@ -30,6 +30,11 @@ public class XxlJobFileAppender { */ private static String logBasePath = "/data/applogs/xxl-job/jobhandler"; private static String glueSrcPath = logBasePath.concat("/gluesource"); + + /** + * 初始化日志路径 会将 glue脚本 保存到日志目录子目录下 + * @param logPath + */ public static void initLogPath(String logPath){ // init if (logPath!=null && logPath.trim().length()>0) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 540e0ea2..eb2a37d5 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -36,11 +36,14 @@ public class EmbedServer { public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { + @Override public void run() { + // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); + //TODO 用于接收服务端请求的线程池 ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, @@ -59,6 +62,8 @@ public class EmbedServer { throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); } }); + + try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); @@ -71,6 +76,7 @@ public class EmbedServer { .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL + //TODO 接收请求 .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) @@ -81,16 +87,18 @@ public class EmbedServer { logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); - // start registry + //TODO 调用 api/registry 注册当前节点 startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { - logger.info(">>>>>>>>>>> xxl-job remoting server stop."); - } catch (Exception e) { - logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); + if (e instanceof InterruptedException) { + logger.info(">>>>>>>>>>> xxl-job remoting server stop."); + } else { + logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); + } } finally { // stop try { @@ -100,15 +108,17 @@ public class EmbedServer { logger.error(e.getMessage(), e); } } + } + }); - thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave + thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); } public void stop() throws Exception { // destroy server thread - if (thread != null && thread.isAlive()) { + if (thread!=null && thread.isAlive()) { thread.interrupt(); } @@ -122,7 +132,7 @@ public class EmbedServer { /** * netty_http - *

+ * * Copy from : https://github.com/xuxueli/xxl-rpc * * @author xuxueli 2015-11-24 22:25:15 @@ -133,7 +143,6 @@ public class EmbedServer { private ExecutorBiz executorBiz; private String accessToken; private ThreadPoolExecutor bizThreadPool; - public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { this.executorBiz = executorBiz; this.accessToken = accessToken; @@ -142,6 +151,7 @@ public class EmbedServer { @Override protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { + // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8); @@ -150,11 +160,12 @@ public class EmbedServer { boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); - // invoke + // TODO 接收请求以后立刻提交给另一个线程池 bizThreadPool.execute(new Runnable() { @Override public void run() { // do invoke + //TODO Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); // to json @@ -167,38 +178,46 @@ public class EmbedServer { } private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { - // valid + + //#################### 校验 if (HttpMethod.POST != httpMethod) { return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); } - if (uri == null || uri.trim().length() == 0) { + if (uri==null || uri.trim().length()==0) { return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty."); } - if (accessToken != null - && accessToken.trim().length() > 0 + if (accessToken!=null + && accessToken.trim().length()>0 && !accessToken.equals(accessTokenReq)) { return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong."); } + //###################### + + // services mapping try { - switch (uri) { - case "/beat": - return executorBiz.beat(); - case "/idleBeat": - IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); - return executorBiz.idleBeat(idleBeatParam); - case "/run": - TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); - return executorBiz.run(triggerParam); - case "/kill": - KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); - return executorBiz.kill(killParam); - case "/log": - LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); - return executorBiz.log(logParam); - default: - return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found."); + if ("/beat".equals(uri)) { + //TODO 用于 路由策略 故障转移 + return executorBiz.beat(); + } else if ("/idleBeat".equals(uri)) { + //TODO 用于 路由策略 忙碌转移 + IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); + return executorBiz.idleBeat(idleBeatParam); + } else if ("/run".equals(uri)) { + //TODO 执行任务 + TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); + return executorBiz.run(triggerParam); + } else if ("/kill".equals(uri)) { + //TODO 中止任务 + KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); + return executorBiz.kill(killParam); + } else if ("/log".equals(uri)) { + //TODO 查询日志 注意 实时通过客户端查询日志 如果客户端是在容器中每次部署要注意下日志路径 + LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); + return executorBiz.log(logParam); + } else { + return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); } } catch (Exception e) { logger.error(e.getMessage(), e); @@ -253,4 +272,6 @@ public class EmbedServer { // stop registry ExecutorRegistryThread.getInstance().toStop(); } + + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index e43a2a49..3365c1d3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -34,7 +34,7 @@ public class ExecutorRegistryThread { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } - + //开启线程往服务器注册 每隔30秒请求一次 registryThread = new Thread(new Runnable() { @Override public void run() { @@ -76,7 +76,7 @@ public class ExecutorRegistryThread { } } - // registry remove + // TODO registry remove 如果当前服务中止 则会立刻调用 移除接口 try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java index b5691542..e0802080 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobLogFileCleanThread.java @@ -27,9 +27,14 @@ public class JobLogFileCleanThread { private Thread localThread; private volatile boolean toStop = false; + + /** + * 扫描指定路径下 日期格式的 文件夹 校验如果是指定日期之前的文件则删除 处理完了以后就sleep一天 支持 interrupt + * @param logRetentionDays + */ public void start(final long logRetentionDays){ - // limit min value + // TDOD 注意点 最小值三天一清理 if (logRetentionDays < 3 ) { return; } @@ -54,7 +59,7 @@ public class JobLogFileCleanThread { for (File childFile: childDirs) { - // valid + // valid 生成的日志文件时日期格式的文件夹 if (!childFile.isDirectory()) { continue; } @@ -73,7 +78,7 @@ public class JobLogFileCleanThread { if (logFileCreateDate == null) { continue; } - + //基于日期校验是不是指定日期之前的文件夹 是则删除 if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { FileUtil.deleteRecursively(childFile); } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index cf07a55a..ba923c65 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -98,8 +98,10 @@ public class JobThread extends Thread{ // init try { + //TODO 调用 xxljob注解的 init方法 handler.init(); } catch (Throwable e) { + //TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意 logger.error(e.getMessage(), e); } @@ -111,6 +113,7 @@ public class JobThread extends Thread{ TriggerParam triggerParam = null; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + //TODO triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; @@ -123,15 +126,18 @@ public class JobThread extends Thread{ triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, + //TODO 当前节点序号 triggerParam.getBroadcastIndex(), + //TODO 节点总数 triggerParam.getBroadcastTotal()); - // init job context + // init job context 包装执行的一些参数 通过 threadlocal 存取 XxlJobContext.setXxlJobContext(xxlJobContext); // execute XxlJobHelper.log("
----------- xxl-job job execute start -----------
----------- Param:" + xxlJobContext.getJobParam()); + //TODO 执行超时时间 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; @@ -162,6 +168,7 @@ public class JobThread extends Thread{ futureThread.interrupt(); } } else { + //如果没有执行超时时间的限制 则 直接调用业务方法 // just execute handler.execute(); } @@ -183,6 +190,8 @@ public class JobThread extends Thread{ ); } else { + //TODO 每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 + //TODO 30这个数字应该可配置 线程存活时间太长了 if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); @@ -207,6 +216,7 @@ public class JobThread extends Thread{ // callback handler info if (!toStop) { // commonm + //TODO 填充callBackQueue 队列 用于讲任务执行结果返回给服务端 TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), @@ -227,6 +237,7 @@ public class JobThread extends Thread{ } // callback trigger request in queue + //TODO 待确认 while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 40acac00..0d74968e 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; /** * Created by xuxueli on 16/7/22. */ + public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); @@ -32,7 +33,7 @@ public class TriggerCallbackThread { } /** - * job results callback queue + * job results callback queue 有序阻塞队列 */ private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); public static void pushCallBack(HandleCallbackParam callback){ @@ -43,6 +44,10 @@ public class TriggerCallbackThread { /** * callback thread */ + /** TODO 队列中的数据 是在每次任务执行完以后填充的 传递code及msg + * 一直循环从队列中获取数据 发生给xxl服务器(/api/callback) + * 如果失败了会写入log目录下 triggerRetryCallbackThread 线程会每隔30秒(可配置)重新发送 + */ private Thread triggerCallbackThread; private Thread triggerRetryCallbackThread; private volatile boolean toStop = false; @@ -63,16 +68,19 @@ public class TriggerCallbackThread { // normal callback while(!toStop){ try { + //TODO 从队列中取一个 阻塞方法 HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List callbackParamList = new ArrayList(); + //TODO 将队列中所有对象全部取出 int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { + //TODO doCallback(callbackParamList); } } @@ -84,6 +92,7 @@ public class TriggerCallbackThread { } // last callback + //TODO 跳出循环 说明xxljob生命周期结束了 防止有数据还停留在队列中 try { List callbackParamList = new ArrayList(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); @@ -104,7 +113,7 @@ public class TriggerCallbackThread { triggerCallbackThread.start(); - // retry + // TODO 扫描上一个线程 请求异常的数据 重新发送 triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { @@ -163,10 +172,13 @@ public class TriggerCallbackThread { private void doCallback(List callbackParamList){ boolean callbackRet = false; // callback, will retry if error + //getAdminBizList 所有xxl服务器地址 for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { + //TODO 调用服务端 callback接口 ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + //日志记录 callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); callbackRet = true; break; @@ -178,6 +190,7 @@ public class TriggerCallbackThread { } } if (!callbackRet) { + //TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动 目前不会有这种场景 appendFailCallbackFile(callbackParamList); } } @@ -251,6 +264,7 @@ public class TriggerCallbackThread { List callbackParamList = (List) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); + //TODO 读取完了 直接删除 callbaclLogFile.delete(); doCallback(callbackParamList); } diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java index 759d6625..a1000951 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/service/jobhandler/SampleXxlJob.java @@ -1,5 +1,6 @@ package com.xxl.job.executor.service.jobhandler; +import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; @@ -36,6 +37,7 @@ public class SampleXxlJob { */ @XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { + logger.info(XxlJobHelper.getJobParam()); XxlJobHelper.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties index 14c796e8..f00e877a 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties @@ -1,5 +1,5 @@ # web port -server.port=8081 +server.port=8084 # no web #spring.main.web-environment=false @@ -8,7 +8,7 @@ logging.config=classpath:logback.xml ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin +xxl.job.admin.addresses=http://127.0.0.1:8081/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token @@ -16,11 +16,12 @@ xxl.job.accessToken=default_token ### xxl-job executor appname xxl.job.executor.appname=xxl-job-executor-sample ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null +### 注册到服务端的地址 与 ip:port2选1 优先address xxl.job.executor.address= ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=9999 ### xxl-job executor log-path -xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler +xxl.job.executor.logpath=../data/applogs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30 diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml index d5a0d2ca..89f526de 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml +++ b/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - +