From e43791d4a15401d4433a0645614351ca26ed0fdb Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Sun, 23 Sep 2018 01:22:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=EF=BC=8C=E5=BA=95=E5=B1=82=E4=BB=A3=E7=A0=81=E9=87=8D?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/admin/core/route/ExecutorRouter.java | 4 +- .../route/strategy/ExecutorRouteBusyover.java | 5 +- .../strategy/ExecutorRouteConsistentHash.java | 7 +- .../route/strategy/ExecutorRouteFailover.java | 5 +- .../route/strategy/ExecutorRouteFirst.java | 5 +- .../core/route/strategy/ExecutorRouteLFU.java | 5 +- .../core/route/strategy/ExecutorRouteLRU.java | 7 +- .../route/strategy/ExecutorRouteLast.java | 5 +- .../route/strategy/ExecutorRouteRandom.java | 5 +- .../route/strategy/ExecutorRouteRound.java | 5 +- .../job/admin/core/trigger/XxlJobTrigger.java | 219 +++++++----------- .../mybatis-mapper/XxlJobLogMapper.xml | 2 + 12 files changed, 109 insertions(+), 165 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java index 5a9623f8..5de9a1d0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/ExecutorRouter.java @@ -5,7 +5,7 @@ import com.xxl.job.core.biz.model.TriggerParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.List; /** * Created by xuxueli on 17/3/10. @@ -19,6 +19,6 @@ public abstract class ExecutorRouter { * @param addressList * @return ReturnT.content=address */ - public abstract ReturnT route(TriggerParam triggerParam, ArrayList addressList); + public abstract ReturnT route(TriggerParam triggerParam, List addressList); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java index 55c3b67f..40d8373f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteBusyover.java @@ -2,13 +2,12 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; +import java.util.List; /** * Created by xuxueli on 17/3/10. @@ -16,7 +15,7 @@ import java.util.ArrayList; public class ExecutorRouteBusyover extends ExecutorRouter { @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { StringBuffer idleBeatResultSB = new StringBuffer(); for (String address : addressList) { // beat diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteConsistentHash.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteConsistentHash.java index 3abb5de1..72e8118f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteConsistentHash.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteConsistentHash.java @@ -1,14 +1,13 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; +import java.util.List; import java.util.SortedMap; import java.util.TreeMap; @@ -57,7 +56,7 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter { return truncateHashCode; } - public String hashJob(int jobId, ArrayList addressList) { + public String hashJob(int jobId, List addressList) { // ------A1------A2-------A3------ // -----------J1------------------ @@ -78,7 +77,7 @@ public class ExecutorRouteConsistentHash extends ExecutorRouter { } @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { String address = hashJob(triggerParam.getJobId(), addressList); return new ReturnT(address); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java index 98181ff2..63428094 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFailover.java @@ -2,13 +2,12 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; +import java.util.List; /** * Created by xuxueli on 17/3/10. @@ -16,7 +15,7 @@ import java.util.ArrayList; public class ExecutorRouteFailover extends ExecutorRouter { @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { StringBuffer beatResultSB = new StringBuffer(); for (String address : addressList) { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFirst.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFirst.java index a64be3c7..de4d7afb 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFirst.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteFirst.java @@ -1,11 +1,10 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; +import java.util.List; /** * Created by xuxueli on 17/3/10. @@ -13,7 +12,7 @@ import java.util.ArrayList; public class ExecutorRouteFirst extends ExecutorRouter { @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList){ + public ReturnT route(TriggerParam triggerParam, List addressList){ return new ReturnT(addressList.get(0)); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java index d4cce7e5..d5ca8da1 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLFU.java @@ -1,7 +1,6 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; @@ -20,7 +19,7 @@ public class ExecutorRouteLFU extends ExecutorRouter { private static ConcurrentHashMap> jobLfuMap = new ConcurrentHashMap>(); private static long CACHE_VALID_TIME = 0; - public String route(int jobId, ArrayList addressList) { + public String route(int jobId, List addressList) { // cache clear if (System.currentTimeMillis() > CACHE_VALID_TIME) { @@ -57,7 +56,7 @@ public class ExecutorRouteLFU extends ExecutorRouter { } @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT(address); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java index 60c233e9..bea509e0 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLRU.java @@ -1,12 +1,11 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** @@ -21,7 +20,7 @@ public class ExecutorRouteLRU extends ExecutorRouter { private static ConcurrentHashMap> jobLRUMap = new ConcurrentHashMap>(); private static long CACHE_VALID_TIME = 0; - public String route(int jobId, ArrayList addressList) { + public String route(int jobId, List addressList) { // cache clear if (System.currentTimeMillis() > CACHE_VALID_TIME) { @@ -55,7 +54,7 @@ public class ExecutorRouteLRU extends ExecutorRouter { } @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { String address = route(triggerParam.getJobId(), addressList); return new ReturnT(address); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLast.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLast.java index bcf9d6f8..4ff3cf6b 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLast.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteLast.java @@ -1,11 +1,10 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; +import java.util.List; /** * Created by xuxueli on 17/3/10. @@ -13,7 +12,7 @@ import java.util.ArrayList; public class ExecutorRouteLast extends ExecutorRouter { @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { return new ReturnT(addressList.get(addressList.size()-1)); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRandom.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRandom.java index 7c383468..5ea4a384 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRandom.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRandom.java @@ -1,11 +1,10 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; +import java.util.List; import java.util.Random; /** @@ -16,7 +15,7 @@ public class ExecutorRouteRandom extends ExecutorRouter { private static Random localRandom = new Random(); @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { String address = addressList.get(localRandom.nextInt(addressList.size())); return new ReturnT(address); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRound.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRound.java index 365f59fe..d297e259 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRound.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/route/strategy/ExecutorRouteRound.java @@ -1,11 +1,10 @@ package com.xxl.job.admin.core.route.strategy; import com.xxl.job.admin.core.route.ExecutorRouter; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; @@ -31,7 +30,7 @@ public class ExecutorRouteRound extends ExecutorRouter { } @Override - public ReturnT route(TriggerParam triggerParam, ArrayList addressList) { + public ReturnT route(TriggerParam triggerParam, List addressList) { String address = addressList.get(count(triggerParam.getJobId())%addressList.size()); return new ReturnT(address); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java index 1212dfd4..07307dc6 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/trigger/XxlJobTrigger.java @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Date; +import java.util.List; /** * xxl-job trigger @@ -36,165 +37,116 @@ public class XxlJobTrigger { * */ public static void trigger(int jobId, int failRetryCount, TriggerTypeEnum triggerType) { - // load data XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } - int finalFailRetryCount = jobInfo.getExecutorFailRetryCount(); - if (failRetryCount >= 0) { - finalFailRetryCount = failRetryCount; + int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); + XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info + + // process trigger + if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && CollectionUtils.isNotEmpty(group.getRegistryList())) { + for (int i = 0; i < group.getRegistryList().size(); i++) { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i); + } + } else { + processTrigger(group, jobInfo, finalFailRetryCount, triggerType, 0); } + } - XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info + private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index){ + // param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy - ArrayList addressList = (ArrayList) group.getRegistryList(); - - // broadcast - if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) { - for (int i = 0; i < addressList.size(); i++) { - String address = addressList.get(i); - - // 1、save log-id - XxlJobLog jobLog = new XxlJobLog(); - jobLog.setJobGroup(jobInfo.getJobGroup()); - jobLog.setJobId(jobInfo.getId()); - XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); - logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); - - // 2、prepare trigger-info - //jobLog.setExecutorAddress(executorAddress); - jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); - jobLog.setExecutorParam(jobInfo.getExecutorParam()); - jobLog.setExecutorFailRetryCount(finalFailRetryCount); - jobLog.setTriggerTime(new Date()); - - ReturnT triggerResult = new ReturnT(null); - StringBuffer triggerMsgSb = new StringBuffer(); - triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") - .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); - triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()).append("("+i+"/"+addressList.size()+")"); // update01 - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); - - - // 3.1、trigger-param - TriggerParam triggerParam = new TriggerParam(); - triggerParam.setJobId(jobInfo.getId()); - triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); - triggerParam.setExecutorParams(jobInfo.getExecutorParam()); - triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); - triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); - triggerParam.setLogId(jobLog.getId()); - triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); - triggerParam.setGlueType(jobInfo.getGlueType()); - triggerParam.setGlueSource(jobInfo.getGlueSource()); - triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); - triggerParam.setBroadcastIndex(i); - triggerParam.setBroadcastTotal(addressList.size()); // update02 - - // 3.2、trigger-run (route run / trigger remote executor) - triggerResult = runExecutor(triggerParam, address); // update03 - triggerMsgSb.append("

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<
").append(triggerResult.getMsg()); - - // 4、save trigger-info - jobLog.setExecutorAddress(triggerResult.getContent()); - jobLog.setTriggerCode(triggerResult.getCode()); - jobLog.setTriggerMsg(triggerMsgSb.toString()); - XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); - - // 5、monitor trigger - JobFailMonitorHelper.monitor(jobLog.getId()); - logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); - - } - } else { - // 1、save log-id - XxlJobLog jobLog = new XxlJobLog(); - jobLog.setJobGroup(jobInfo.getJobGroup()); - jobLog.setJobId(jobInfo.getId()); - XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); - logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); - - // 2、prepare trigger-info - //jobLog.setExecutorAddress(executorAddress); - jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); - jobLog.setExecutorParam(jobInfo.getExecutorParam()); - jobLog.setExecutorFailRetryCount(finalFailRetryCount); - jobLog.setTriggerTime(new Date()); - - ReturnT triggerResult = new ReturnT(null); - StringBuffer triggerMsgSb = new StringBuffer(); - triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") - .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); - triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); - triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); - - // 3.0、trigger-valid - String address = null; - if (CollectionUtils.isEmpty(addressList)) { - triggerResult.setCode(ReturnT.FAIL_CODE); - triggerMsgSb.append("
----------------------
").append(I18nUtil.getString("jobconf_trigger_address_empty")); + // 1、save log-id + XxlJobLog jobLog = new XxlJobLog(); + jobLog.setJobGroup(jobInfo.getJobGroup()); + jobLog.setJobId(jobInfo.getId()); + jobLog.setTriggerTime(new Date()); + XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog); + logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); + + // 2、init trigger-param + TriggerParam triggerParam = new TriggerParam(); + triggerParam.setJobId(jobInfo.getId()); + triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); + triggerParam.setExecutorParams(jobInfo.getExecutorParam()); + triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); + triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); + triggerParam.setLogId(jobLog.getId()); + triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); + triggerParam.setGlueType(jobInfo.getGlueType()); + triggerParam.setGlueSource(jobInfo.getGlueSource()); + triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); + triggerParam.setBroadcastIndex(index); + triggerParam.setBroadcastTotal(group.getRegistryList()!=null?group.getRegistryList().size():0); + + // 3、init address + String address = null; + ReturnT routeAddressResult = null; + if (CollectionUtils.isNotEmpty(group.getRegistryList())) { + if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { + address = group.getRegistryList().get(index); } else { - // 3.1、trigger-param - TriggerParam triggerParam = new TriggerParam(); - triggerParam.setJobId(jobInfo.getId()); - triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); - triggerParam.setExecutorParams(jobInfo.getExecutorParam()); - triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); - triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); - triggerParam.setLogId(jobLog.getId()); - triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); - triggerParam.setGlueType(jobInfo.getGlueType()); - triggerParam.setGlueSource(jobInfo.getGlueSource()); - triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); - triggerParam.setBroadcastIndex(0); - triggerParam.setBroadcastTotal(1); - - // 3.2、trigger-run (route run / trigger remote executor) - //triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList); - ReturnT routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, addressList); + routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); - triggerResult = runExecutor(triggerParam, address); } - triggerMsgSb.append("

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<
") - .append(routeAddressResult.getMsg()!=null?routeAddressResult.getMsg()+"

":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); - } + } else { + routeAddressResult = new ReturnT(ReturnT.FAIL_CODE, "
----------------------
" + I18nUtil.getString("jobconf_trigger_address_empty")); + } - // 4、save trigger-info - jobLog.setExecutorAddress(address); - jobLog.setTriggerCode(triggerResult.getCode()); - jobLog.setTriggerMsg(triggerMsgSb.toString()); - XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); - - // 5、monitor trigger - JobFailMonitorHelper.monitor(jobLog.getId()); - logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); + // 4、trigger remote executor + ReturnT triggerResult = null; + if (address != null) { + triggerResult = runExecutor(triggerParam, address); + } else { + triggerResult = new ReturnT(ReturnT.FAIL_CODE, null); } + // 5、collection trigger info + StringBuffer triggerMsgSb = new StringBuffer(); + triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); + triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); + triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") + .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); + triggerMsgSb.append("
").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); + triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); + if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)) { + triggerMsgSb.append("("+index+"/"+(group.getRegistryList()!=null?group.getRegistryList().size():0)+")"); + } + triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); + triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); + triggerMsgSb.append("
").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); + + triggerMsgSb.append("

>>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<<
") + .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"

":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); + + // 6、save log trigger-info + jobLog.setExecutorAddress(address); + jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); + jobLog.setExecutorParam(jobInfo.getExecutorParam()); + jobLog.setExecutorFailRetryCount(finalFailRetryCount); + //jobLog.setTriggerTime(); + jobLog.setTriggerCode(triggerResult.getCode()); + jobLog.setTriggerMsg(triggerMsgSb.toString()); + XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); + + // 7、monitor trigger + JobFailMonitorHelper.monitor(jobLog.getId()); + logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); } /** * run executor * @param triggerParam * @param address - * @return ReturnT.content: final address + * @return */ public static ReturnT runExecutor(TriggerParam triggerParam, String address){ ReturnT runResult = null; @@ -212,7 +164,6 @@ public class XxlJobTrigger { runResultSB.append("
msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); - runResult.setContent(address); return runResult; } diff --git a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml index bc0aa8ec..66c2c59c 100644 --- a/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml +++ b/xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobLogMapper.xml @@ -117,11 +117,13 @@ INSERT INTO XXL_JOB_QRTZ_TRIGGER_LOG ( `job_group`, `job_id`, + `trigger_time`, `trigger_code`, `handle_code` ) VALUES ( #{jobGroup}, #{jobId}, + #{triggerTime}, #{triggerCode}, #{handleCode} );