pull/1/head
xueli.xue 8 years ago
parent 725d1f4fe5
commit b8e196606d

@ -149,8 +149,8 @@ public class JobLogController {
ReturnT<String> runResult = executorBiz.kill(String.valueOf(log.getJobGroup()), log.getJobName()); ReturnT<String> runResult = executorBiz.kill(String.valueOf(log.getJobGroup()), log.getJobName());
if (ReturnT.SUCCESS_CODE == runResult.getCode()) { if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
log.setHandleCode(ReturnT.SUCCESS_CODE); log.setHandleCode(ReturnT.FAIL_CODE);
log.setHandleMsg("人为操作主动终止"); log.setHandleMsg("人为操作主动终止:" + (runResult.getMsg()!=null?runResult.getMsg():""));
log.setHandleTime(new Date()); log.setHandleTime(new Date());
xxlJobLogDao.updateHandleInfo(log); xxlJobLogDao.updateHandleInfo(log);
return new ReturnT<String>(runResult.getMsg()); return new ReturnT<String>(runResult.getMsg());

@ -74,14 +74,14 @@ public class RemoteHttpJobBean extends QuartzJobBean {
} }
// failover trigger // failover trigger
ReturnT<String> responseModel = failoverTrigger(addressList, triggerParam, jobLog); ReturnT<String> triggerResult = failoverTrigger(addressList, triggerParam, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger response, jobId:{}, responseModel:{}", jobLog.getId(), responseModel.toString()); logger.info(">>>>>>>>>>> xxl-job failoverTrigger, jobId:{}, triggerResult:{}", jobLog.getId(), triggerResult.toString());
// update trigger info 2/2 // update trigger info 2/2
jobLog.setTriggerCode(responseModel.getCode()); jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(responseModel.getMsg()); jobLog.setTriggerMsg(triggerResult.getMsg());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog); XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// monitor triger // monitor triger

@ -2,6 +2,7 @@ package com.xxl.job.admin.core.schedule;
import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper; import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobInfoDao;
@ -67,12 +68,21 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
callbackAddress = IpUtil.getIpPort(callBackPort);; callbackAddress = IpUtil.getIpPort(callBackPort);;
} }
// init JobRegistryHelper // admin registry run
JobRegistryHelper.discover("g", "k"); JobRegistryHelper.getInstance().start();
// admin monitor run
JobMonitorHelper.getInstance().start();
} }
// destroy // destroy
public void destroy(){ public void destroy(){
// admin registry stop
JobRegistryHelper.getInstance().stop();
// admin monitor stop
JobMonitorHelper.getInstance().stop();
serverFactory.destroy(); serverFactory.destroy();
} }

@ -16,28 +16,32 @@ import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* job monitor helper * job monitor instance
* @author xuxueli 2015-9-1 18:05:56 * @author xuxueli 2015-9-1 18:05:56
*/ */
public class JobMonitorHelper { public class JobMonitorHelper {
private static Logger logger = LoggerFactory.getLogger(JobMonitorHelper.class); private static Logger logger = LoggerFactory.getLogger(JobMonitorHelper.class);
private static JobMonitorHelper helper = new JobMonitorHelper(); private static JobMonitorHelper instance = new JobMonitorHelper();
private ExecutorService executor = Executors.newCachedThreadPool(); public static JobMonitorHelper getInstance(){
return instance;
}
private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(0xfff8); private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(0xfff8);
private ConcurrentHashMap<String, Integer> countMap = new ConcurrentHashMap<String, Integer>();
public JobMonitorHelper(){ private Thread monitorThread;
// consumer private boolean toStop = false;
executor.execute(new Runnable() { public void start(){
monitorThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
try { try {
logger.debug(">>>>>>>>>>> job monitor beat ... "); logger.debug(">>>>>>>>>>> job monitor beat ... ");
Integer jobLogId = JobMonitorHelper.helper.queue.take(); Integer jobLogId = JobMonitorHelper.instance.queue.take();
if (jobLogId != null && jobLogId > 0) { if (jobLogId != null && jobLogId > 0) {
logger.info(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId); logger.debug(">>>>>>>>>>> job monitor heat success, JobLogId:{}", jobLogId);
XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId); XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
if (log!=null) { if (log!=null) {
if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) { if (ReturnT.SUCCESS_CODE==log.getTriggerCode() && log.getHandleCode()==0) {
@ -73,11 +77,18 @@ public class JobMonitorHelper {
} }
} }
}); });
monitorThread.setDaemon(true);
monitorThread.start();
}
public void stop(){
toStop = true;
//monitorThread.interrupt();
} }
// producer // producer
public static void monitor(int jobLogId){ public static void monitor(int jobLogId){
JobMonitorHelper.helper.queue.offer(jobLogId); getInstance().queue.offer(jobLogId);
} }
} }

@ -12,20 +12,26 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* job registry helper * job registry instance
* @author xuxueli 2016-10-02 19:10:24 * @author xuxueli 2016-10-02 19:10:24
*/ */
public class JobRegistryHelper { public class JobRegistryHelper {
private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class); private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
private static JobRegistryHelper helper = new JobRegistryHelper(); private static JobRegistryHelper instance = new JobRegistryHelper();
public static JobRegistryHelper getInstance(){
return instance;
}
private ConcurrentHashMap<String, List<String>> registMap = new ConcurrentHashMap<String, List<String>>(); private ConcurrentHashMap<String, List<String>> registMap = new ConcurrentHashMap<String, List<String>>();
public JobRegistryHelper(){ private Thread registryThread;
Thread registryThread = new Thread(new Runnable() { private boolean toStop = false;
public void start(){
registryThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (true) { while (!toStop) {
try { try {
// registry admin // registry admin
int ret = XxlJobDynamicScheduler.xxlJobRegistryDao.registryUpdate(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobDynamicScheduler.getCallbackAddress()); int ret = XxlJobDynamicScheduler.xxlJobRegistryDao.registryUpdate(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobDynamicScheduler.getCallbackAddress());
@ -50,19 +56,23 @@ public class JobRegistryHelper {
} }
registMap = temp; registMap = temp;
} catch (Exception e) { } catch (Exception e) {
logger.error("job registry helper error:{}", e); logger.error("job registry instance error:{}", e);
} }
try { try {
TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("job registry helper error:{}", e); logger.error("job registry instance error:{}", e);
} }
} }
} }
}); });
registryThread.setDaemon(true); registryThread.setDaemon(true);
registryThread.start(); registryThread.start();
}
public void stop(){
toStop = true;
//registryThread.interrupt();
} }
private static String makeGroupKey(String registryGroup, String registryKey){ private static String makeGroupKey(String registryGroup, String registryKey){
@ -71,7 +81,7 @@ public class JobRegistryHelper {
public static List<String> discover(String registryGroup, String registryKey){ public static List<String> discover(String registryGroup, String registryKey){
String groupKey = makeGroupKey(registryGroup, registryKey); String groupKey = makeGroupKey(registryGroup, registryKey);
return helper.registMap.get(groupKey); return instance.registMap.get(groupKey);
} }
} }

@ -173,7 +173,7 @@
<#macro commonFooter > <#macro commonFooter >
<footer class="main-footer"> <footer class="main-footer">
<div class="pull-right hidden-xs"> <div class="pull-right hidden-xs">
<b>Version</b> 1.5 <b>Version</b> 1.6
</div> </div>
<strong>Copyright &copy; 2015-${.now?string('yyyy')} &nbsp; <strong>Copyright &copy; 2015-${.now?string('yyyy')} &nbsp;
<a href="https://github.com/xuxueli/xxl-job" target="_blank" >github</a>&nbsp; <a href="https://github.com/xuxueli/xxl-job" target="_blank" >github</a>&nbsp;

@ -82,10 +82,10 @@
<th name="executorHandler" >JobHandler</th> <th name="executorHandler" >JobHandler</th>
<th name="executorParam" >任务参数</th> <th name="executorParam" >任务参数</th>
<th name="triggerTime" >调度时间</th> <th name="triggerTime" >调度时间</th>
<th name="triggerStatus" >调度结果</th> <th name="triggerCode" >调度结果</th>
<th name="triggerMsg" >调度备注</th> <th name="triggerMsg" >调度备注</th>
<th name="handleTime" >执行时间</th> <th name="handleTime" >执行时间</th>
<th name="handleStatus" >执行结果</th> <th name="handleCode" >执行结果</th>
<th name="handleMsg" >执行备注</th> <th name="handleMsg" >执行备注</th>
<th name="handleMsg" >操作</th> <th name="handleMsg" >操作</th>
</tr> </tr>

@ -112,7 +112,13 @@ $(function() {
return data?moment(new Date(data)).format("YYYY-MM-DD HH:mm:ss"):""; return data?moment(new Date(data)).format("YYYY-MM-DD HH:mm:ss"):"";
} }
}, },
{ "data": 'triggerStatus'}, {
"data": 'triggerCode',
"render": function ( data, type, row ) {
return (data==200)?'<span style="color: green">成功</span>':(data==500)?'<span style="color: red">失败</span>':(data==0)?'':data;
}
},
{ {
"data": 'triggerMsg', "data": 'triggerMsg',
"render": function ( data, type, row ) { "render": function ( data, type, row ) {
@ -125,7 +131,12 @@ $(function() {
return data?moment(new Date(data)).format("YYYY-MM-DD HH:mm:ss"):""; return data?moment(new Date(data)).format("YYYY-MM-DD HH:mm:ss"):"";
} }
}, },
{ "data": 'handleStatus',"bSortable": false}, {
"data": 'handleCode',
"render": function ( data, type, row ) {
return (data==200)?'<span style="color: green">成功</span>':(data==500)?'<span style="color: red">失败</span>':(data==0)?'':data;
}
},
{ {
"data": 'handleMsg', "data": 'handleMsg',
"render": function ( data, type, row ) { "render": function ( data, type, row ) {
@ -136,9 +147,9 @@ $(function() {
"render": function ( data, type, row ) { "render": function ( data, type, row ) {
// better support expression or string, not function // better support expression or string, not function
return function () { return function () {
if (row.triggerStatus == 'SUCCESS' || row.handleStatus){ if (row.triggerCode == 200){
var temp = '<a href="javascript:;" class="logDetail" _id="'+ row.id +'">执行日志</a>'; var temp = '<a href="javascript:;" class="logDetail" _id="'+ row.id +'">执行日志</a>';
if(!row.handleStatus){ if(row.handleCode == 0){
temp += '<br><a href="javascript:;" class="logKill" _id="'+ row.id +'">终止任务</a>'; temp += '<br><a href="javascript:;" class="logKill" _id="'+ row.id +'">终止任务</a>';
} }
return temp; return temp;

@ -35,11 +35,11 @@ public class ExecutorBizImpl implements ExecutorBiz {
IJobHandler handler = jobThread.getHandler(); IJobHandler handler = jobThread.getHandler();
jobThread.toStop("人工手动终止"); jobThread.toStop("人工手动终止");
jobThread.interrupt(); jobThread.interrupt();
//XxlJobExecutor.registJobThread(jobKey, handler); XxlJobExecutor.removeJobThread(jobKey);
return ReturnT.SUCCESS; return ReturnT.SUCCESS;
} }
return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread not found."); return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread aleady killed.");
} }
@Override @Override

@ -101,5 +101,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
public static JobThread loadJobThread(String jobKey){ public static JobThread loadJobThread(String jobKey){
return JobThreadRepository.get(jobKey); return JobThreadRepository.get(jobKey);
} }
public static void removeJobThread(String jobKey){
JobThreadRepository.remove(jobKey);
}
} }

@ -0,0 +1,8 @@
package com.xxl.job.core.thread;
/**
* Created by xuxueli on 17/3/2.
*/
public class ExecutorRegistryThread extends Thread {
}
Loading…
Cancel
Save