|
|
|
|
@ -25,24 +25,23 @@ import java.util.concurrent.*;
|
|
|
|
|
public class JobThread extends Thread{
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(JobThread.class);
|
|
|
|
|
|
|
|
|
|
private int jobId;
|
|
|
|
|
private IJobHandler handler;
|
|
|
|
|
private LinkedBlockingQueue<TriggerRequest> triggerQueue;
|
|
|
|
|
private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
|
|
|
|
|
private final int jobId;
|
|
|
|
|
private final IJobHandler handler;
|
|
|
|
|
private final LinkedBlockingQueue<TriggerRequest> triggerQueue;
|
|
|
|
|
private final Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
|
|
|
|
|
|
|
|
|
|
private volatile boolean toStop = false;
|
|
|
|
|
private volatile boolean toStop = false; // thread stop flag
|
|
|
|
|
private String stopReason;
|
|
|
|
|
|
|
|
|
|
private boolean running = false; // if running job
|
|
|
|
|
private int idleTimes = 0; // idle times
|
|
|
|
|
private boolean running = false; // if running job
|
|
|
|
|
private int idleTimes = 0; // idle times
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public JobThread(int jobId, IJobHandler handler) {
|
|
|
|
|
this.jobId = jobId;
|
|
|
|
|
this.handler = handler;
|
|
|
|
|
this.triggerQueue = new LinkedBlockingQueue<>();
|
|
|
|
|
//this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
|
|
|
|
|
this.triggerLogIdSet = ConcurrentHashMap.newKeySet();
|
|
|
|
|
this.triggerLogIdSet = ConcurrentHashMap.newKeySet(); // Collections.synchronizedSet(new HashSet<Long>());
|
|
|
|
|
|
|
|
|
|
// assign job thread name
|
|
|
|
|
this.setName("xxl-job, JobThread-"+jobId+"-"+System.currentTimeMillis());
|
|
|
|
|
@ -57,8 +56,8 @@ public class JobThread extends Thread{
|
|
|
|
|
public Response<String> pushTriggerQueue(TriggerRequest triggerParam) {
|
|
|
|
|
// avoid repeat
|
|
|
|
|
if (!triggerLogIdSet.add(triggerParam.getLogId())) {
|
|
|
|
|
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
|
|
|
|
|
return Response.of(XxlJobContext.HANDLE_CODE_FAIL, "repeate trigger job, logId:" + triggerParam.getLogId());
|
|
|
|
|
logger.info(">>>>>>>>>>> repeat trigger job, logId:{}", triggerParam.getLogId());
|
|
|
|
|
return Response.of(XxlJobContext.HANDLE_CODE_FAIL, "repeat trigger job, logId:" + triggerParam.getLogId());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// push trigger queue
|
|
|
|
|
@ -83,20 +82,20 @@ public class JobThread extends Thread{
|
|
|
|
|
* is running job
|
|
|
|
|
*/
|
|
|
|
|
public boolean isRunningOrHasQueue() {
|
|
|
|
|
return running || triggerQueue.size()>0;
|
|
|
|
|
return running || !triggerQueue.isEmpty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
|
|
|
|
|
// init
|
|
|
|
|
// invoke init-method, only once
|
|
|
|
|
try {
|
|
|
|
|
handler.init();
|
|
|
|
|
} catch (Throwable e) {
|
|
|
|
|
logger.error(e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// execute
|
|
|
|
|
// invoke job, listen schedule-center
|
|
|
|
|
while(!toStop){
|
|
|
|
|
running = false;
|
|
|
|
|
idleTimes++;
|
|
|
|
|
@ -236,7 +235,7 @@ public class JobThread extends Thread{
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// destroy
|
|
|
|
|
// invoke destroy-method, only once
|
|
|
|
|
try {
|
|
|
|
|
handler.destroy();
|
|
|
|
|
} catch (Throwable e) {
|
|
|
|
|
|