From ee9eca0db718b6e8c074698ae0b2f9110432dd2a Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Sat, 12 Mar 2016 18:39:28 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9C=E7=A8=8B=E8=B0=83=E5=BA=A6=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E6=A0=A1=E9=AA=8C=EF=BC=9A1=E3=80=81=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E4=B8=AD=E5=BF=83=E4=B8=8E=E6=89=A7=E8=A1=8C=E5=99=A8?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E5=81=8F=E7=A7=BB=E8=B6=85=E8=BF=8760?= =?UTF-8?q?=E7=A7=92=E6=8B=92=E7=BB=9D=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=EF=BC=9B2=E3=80=81=E6=89=A7=E8=A1=8C=E5=99=A8=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E8=B0=83=E5=BA=A6uuid=E9=81=BF=E5=85=8D=E4=B8=80?= =?UTF-8?q?=E6=AC=A1=E8=B0=83=E5=BA=A6=E9=87=8D=E5=A4=8D=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xxl/job/service/job/RemoteHttpJobBean.java | 1 + .../xxl/job/client/handler/HandlerRepository.java | 8 ++++++++ .../com/xxl/job/client/handler/HandlerThread.java | 15 +++++++++++---- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java index b61d956b..cf0b0cb4 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java @@ -57,6 +57,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { HashMap params = new HashMap(); params.put(HandlerRepository.TRIGGER_LOG_URL, PropertiesUtil.getString(HandlerRepository.TRIGGER_LOG_URL)); params.put(HandlerRepository.TRIGGER_LOG_ID, String.valueOf(jobLog.getId())); + params.put(HandlerRepository.TRIGGER_TIMESTAMP, String.valueOf(System.currentTimeMillis())); params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME)); params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS)); diff --git a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java index 71255c02..4fdfd0b4 100644 --- a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java +++ b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java @@ -22,6 +22,7 @@ public class HandlerRepository { public static final String TRIGGER_LOG_ID = "trigger_log_id"; public static final String TRIGGER_LOG_URL = "trigger_log_url"; + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; public static ConcurrentHashMap handlerTreadMap = new ConcurrentHashMap(); @@ -41,6 +42,13 @@ public class HandlerRepository { RemoteCallBack callback = new RemoteCallBack(); callback.setStatus(RemoteCallBack.FAIL); + // encryption check + long timestamp = _param.get(HandlerRepository.TRIGGER_TIMESTAMP)!=null?Long.valueOf(_param.get(HandlerRepository.TRIGGER_TIMESTAMP)):-1; + if (System.currentTimeMillis() - timestamp > 60000) { + callback.setMsg("Timestamp check failed."); + return JacksonUtil.writeValueAsString(callback); + } + // push data to queue String handler_name = _param.get(HandlerRepository.HANDLER_NAME); if (handler_name!=null && handler_name.trim().length()>0) { diff --git a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java index db7bd888..aa8c401b 100644 --- a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java +++ b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,14 +24,18 @@ public class HandlerThread extends Thread{ private IJobHandler handler; private LinkedBlockingQueue> handlerDataQueue; + private ConcurrentHashSet logIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID public HandlerThread(IJobHandler handler) { this.handler = handler; handlerDataQueue = new LinkedBlockingQueue>(); + logIdSet = new ConcurrentHashSet(); } public void pushData(Map param) { - handlerDataQueue.offer(param); + if (param.get(HandlerRepository.TRIGGER_LOG_ID)!=null && !logIdSet.contains(param.get(HandlerRepository.TRIGGER_LOG_ID))) { + handlerDataQueue.offer(param); + } } int i = 1; @@ -38,12 +43,13 @@ public class HandlerThread extends Thread{ public void run() { while(true){ try { - i++; Map handlerData = handlerDataQueue.poll(); if (handlerData!=null) { + i= 0; String trigger_log_url = handlerData.get(HandlerRepository.TRIGGER_LOG_URL); String trigger_log_id = handlerData.get(HandlerRepository.TRIGGER_LOG_ID); String handler_params = handlerData.get(HandlerRepository.HANDLER_PARAMS); + logIdSet.remove(trigger_log_id); // parse param String[] handlerParams = null; @@ -68,9 +74,8 @@ public class HandlerThread extends Thread{ // callback handler info RemoteCallBack callback = null; try { - HashMap params = new HashMap(); - params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id); + params.put("trigger_log_id", trigger_log_id); params.put("status", _status.name()); params.put("msg", _msg); callback = HttpUtil.post(trigger_log_url, params); @@ -80,6 +85,8 @@ public class HandlerThread extends Thread{ logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback:{}, thread:{}", new Object[]{handlerData, _status, _msg, callback, this}); } else { + i++; + logIdSet.clear(); try { TimeUnit.MILLISECONDS.sleep(i * 100); } catch (InterruptedException e) {