diff --git a/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java index b61d956b..cf0b0cb4 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/service/job/RemoteHttpJobBean.java @@ -57,6 +57,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { HashMap params = new HashMap(); params.put(HandlerRepository.TRIGGER_LOG_URL, PropertiesUtil.getString(HandlerRepository.TRIGGER_LOG_URL)); params.put(HandlerRepository.TRIGGER_LOG_ID, String.valueOf(jobLog.getId())); + params.put(HandlerRepository.TRIGGER_TIMESTAMP, String.valueOf(System.currentTimeMillis())); params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME)); params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS)); diff --git a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java index 71255c02..4fdfd0b4 100644 --- a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java +++ b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerRepository.java @@ -22,6 +22,7 @@ public class HandlerRepository { public static final String TRIGGER_LOG_ID = "trigger_log_id"; public static final String TRIGGER_LOG_URL = "trigger_log_url"; + public static final String TRIGGER_TIMESTAMP = "trigger_timestamp"; public static ConcurrentHashMap handlerTreadMap = new ConcurrentHashMap(); @@ -41,6 +42,13 @@ public class HandlerRepository { RemoteCallBack callback = new RemoteCallBack(); callback.setStatus(RemoteCallBack.FAIL); + // encryption check + long timestamp = _param.get(HandlerRepository.TRIGGER_TIMESTAMP)!=null?Long.valueOf(_param.get(HandlerRepository.TRIGGER_TIMESTAMP)):-1; + if (System.currentTimeMillis() - timestamp > 60000) { + callback.setMsg("Timestamp check failed."); + return JacksonUtil.writeValueAsString(callback); + } + // push data to queue String handler_name = _param.get(HandlerRepository.HANDLER_NAME); if (handler_name!=null && handler_name.trim().length()>0) { diff --git a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java index db7bd888..aa8c401b 100644 --- a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java +++ b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,14 +24,18 @@ public class HandlerThread extends Thread{ private IJobHandler handler; private LinkedBlockingQueue> handlerDataQueue; + private ConcurrentHashSet logIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID public HandlerThread(IJobHandler handler) { this.handler = handler; handlerDataQueue = new LinkedBlockingQueue>(); + logIdSet = new ConcurrentHashSet(); } public void pushData(Map param) { - handlerDataQueue.offer(param); + if (param.get(HandlerRepository.TRIGGER_LOG_ID)!=null && !logIdSet.contains(param.get(HandlerRepository.TRIGGER_LOG_ID))) { + handlerDataQueue.offer(param); + } } int i = 1; @@ -38,12 +43,13 @@ public class HandlerThread extends Thread{ public void run() { while(true){ try { - i++; Map handlerData = handlerDataQueue.poll(); if (handlerData!=null) { + i= 0; String trigger_log_url = handlerData.get(HandlerRepository.TRIGGER_LOG_URL); String trigger_log_id = handlerData.get(HandlerRepository.TRIGGER_LOG_ID); String handler_params = handlerData.get(HandlerRepository.HANDLER_PARAMS); + logIdSet.remove(trigger_log_id); // parse param String[] handlerParams = null; @@ -68,9 +74,8 @@ public class HandlerThread extends Thread{ // callback handler info RemoteCallBack callback = null; try { - HashMap params = new HashMap(); - params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id); + params.put("trigger_log_id", trigger_log_id); params.put("status", _status.name()); params.put("msg", _msg); callback = HttpUtil.post(trigger_log_url, params); @@ -80,6 +85,8 @@ public class HandlerThread extends Thread{ logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback:{}, thread:{}", new Object[]{handlerData, _status, _msg, callback, this}); } else { + i++; + logIdSet.clear(); try { TimeUnit.MILLISECONDS.sleep(i * 100); } catch (InterruptedException e) {