pull/56/head
liyong 4 years ago
parent bee703c790
commit 5aa7b240c9

@ -46,16 +46,18 @@ public class ExecutorBizImpl implements ExecutorBiz {
@Override @Override
public ReturnT<String> run(TriggerParam triggerParam) { public ReturnT<String> run(TriggerParam triggerParam) {
// load oldjobHandler + jobThread // load oldjobHandler + jobThread
//优先获取缓存中的处理器 (可以自定义设置) //TODO 获取有没有正在执行的线程
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null; String removeOldReason = null;
// validjobHandler + jobThread // validjobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
//TODO 最常用的 bean 模式
if (GlueTypeEnum.BEAN == glueTypeEnum) { if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler 通过解析对象时包装的map获取 // new jobhandler 通过解析对象时包装的map获取
//TODO 通过一开始解析的xxljob注解 获取对应的对象
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread // valid old jobThread
@ -75,7 +77,9 @@ public class ExecutorBizImpl implements ExecutorBiz {
} }
} }
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { }
//groovy 脚本 忽略
else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread // valid old jobThread
if (jobThread != null && if (jobThread != null &&
@ -98,7 +102,9 @@ public class ExecutorBizImpl implements ExecutorBiz {
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
} }
} }
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { }
//其他脚本模式 忽略
else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread // valid old jobThread
if (jobThread != null && if (jobThread != null &&
@ -121,7 +127,7 @@ public class ExecutorBizImpl implements ExecutorBiz {
// executor block strategy // executor block strategy
if (jobThread != null) { if (jobThread != null) {
//阻塞处理策略 //TODO 阻塞处理策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
//丢弃后续调用 //丢弃后续调用
@ -145,11 +151,12 @@ public class ExecutorBizImpl implements ExecutorBiz {
// replace thread (new or exists invalid) // replace thread (new or exists invalid)
if (jobThread == null) { if (jobThread == null) {
//开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt //TODO 开启一个执行业务的线程 如果是覆盖之前的调用 则会触发之前任务的 interrupt
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
} }
// push data to queue // push data to queue
//TODO 讲当前任务添加进队列 上一步开启的线程 会扫描这个队列
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult; return pushResult;
} }

@ -67,20 +67,22 @@ public class XxlJobExecutor {
// ---------------------- start + stop ---------------------- // ---------------------- start + stop ----------------------
public void start() throws Exception { public void start() throws Exception {
// init logpath //TODO 初始化日志路径
XxlJobFileAppender.initLogPath(logPath); XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client // init invoker, admin-client
//TODO 初始化服务器地址
initAdminBizList(adminAddresses, accessToken); initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread 开启一个线程清理日志 //TODO init JobLogFileCleanThread 开启一个线程清理日志
JobLogFileCleanThread.getInstance().start(logRetentionDays); JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread //TODO init TriggerCallbackThread 调用服务端 /api/callback 反馈任务执行结果
TriggerCallbackThread.getInstance().start(); TriggerCallbackThread.getInstance().start();
// init executor-server // init executor-server
//TODO 初始化XXLJOB服务
initEmbedServer(address, ip, port, appname, accessToken); initEmbedServer(address, ip, port, appname, accessToken);
} }
public void destroy(){ public void destroy(){
@ -164,6 +166,7 @@ public class XxlJobExecutor {
// start // start
embedServer = new EmbedServer(); embedServer = new EmbedServer();
//TODO
embedServer.start(address, port, appname, accessToken); embedServer.start(address, port, appname, accessToken);
} }

@ -39,13 +39,15 @@ public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationC
/*initJobHandlerRepository(applicationContext);*/ /*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method) // init JobHandler Repository (for method)
//初始化xxljob注解对象
initJobHandlerMethodRepository(applicationContext); initJobHandlerMethodRepository(applicationContext);
//TODO 初始化 glueFactory 使用场景待定 应该是执行脚本用的 //初始化 glueFactory 用于执行groovy脚本
GlueFactory.refreshInstance(1); GlueFactory.refreshInstance(1);
// super start // super start
try { try {
//TODO
super.start(); super.start();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);

@ -43,6 +43,7 @@ public class EmbedServer {
// param // param
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
//TODO 用于接收服务端请求的线程池
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0, 0,
200, 200,
@ -75,7 +76,7 @@ public class EmbedServer {
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec()) .addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
//执行 //TODO 接收请求
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
} }
}) })
@ -86,7 +87,7 @@ public class EmbedServer {
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
//注册当前节点 //TODO 调用 api/registry 注册当前节点
startRegistry(appname, address); startRegistry(appname, address);
// wait util stop // wait util stop
@ -159,11 +160,12 @@ public class EmbedServer {
boolean keepAlive = HttpUtil.isKeepAlive(msg); boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke // TODO 接收请求以后立刻提交给另一个线程池
bizThreadPool.execute(new Runnable() { bizThreadPool.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
// do invoke // do invoke
//TODO
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq); Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json // to json
@ -177,7 +179,7 @@ public class EmbedServer {
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid //#################### 校验
if (HttpMethod.POST != httpMethod) { if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
} }
@ -189,21 +191,29 @@ public class EmbedServer {
&& !accessToken.equals(accessTokenReq)) { && !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong."); return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
} }
//######################
// services mapping // services mapping
try { try {
if ("/beat".equals(uri)) { if ("/beat".equals(uri)) {
//TODO 用于 路由策略 故障转移
return executorBiz.beat(); return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) { } else if ("/idleBeat".equals(uri)) {
//TODO 用于 路由策略 忙碌转移
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam); return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) { } else if ("/run".equals(uri)) {
//TODO 执行任务
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam); return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) { } else if ("/kill".equals(uri)) {
//TODO 中止任务
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam); return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) { } else if ("/log".equals(uri)) {
//TODO 查询日志
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam); return executorBiz.log(logParam);
} else { } else {

@ -76,7 +76,7 @@ public class ExecutorRegistryThread {
} }
} }
// registry remove 如果通过stop 中断了 注册过程 则会立刻调用 移除接口 // TODO registry remove 如果当前服务中止 则会立刻调用 移除接口
try { try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

@ -98,7 +98,7 @@ public class JobThread extends Thread{
// init // init
try { try {
//调用 xxljob注解的 init方法 //TODO 调用 xxljob注解的 init方法
handler.init(); handler.init();
} catch (Throwable e) { } catch (Throwable e) {
//TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意 //TODO 初始化方法报错 并不会影响正常流程执行 如果 初始化涉及业务相关 要注意
@ -113,6 +113,7 @@ public class JobThread extends Thread{
TriggerParam triggerParam = null; TriggerParam triggerParam = null;
try { try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
//TODO
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) { if (triggerParam!=null) {
running = true; running = true;
@ -125,7 +126,9 @@ public class JobThread extends Thread{
triggerParam.getJobId(), triggerParam.getJobId(),
triggerParam.getExecutorParams(), triggerParam.getExecutorParams(),
logFileName, logFileName,
//TODO 当前节点序号
triggerParam.getBroadcastIndex(), triggerParam.getBroadcastIndex(),
//TODO 节点总数
triggerParam.getBroadcastTotal()); triggerParam.getBroadcastTotal());
// init job context 包装执行的一些参数 通过 threadlocal 存取 // init job context 包装执行的一些参数 通过 threadlocal 存取
@ -134,7 +137,7 @@ public class JobThread extends Thread{
// execute // execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()); XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
//执行超时时间 //TODO 执行超时时间
if (triggerParam.getExecutorTimeout() > 0) { if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout // limit timeout
Thread futureThread = null; Thread futureThread = null;
@ -187,7 +190,8 @@ public class JobThread extends Thread{
); );
} else { } else {
//每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务 //TODO 每3秒获取一次执行参数 超过30次 且 依然没有 就 删除对应的线程 删除线程会将top置为true 这个设计应该是为了一些 执行频繁的任务设计的 一直死循环跑任务
//TODO 30这个数字应该可配置 线程存活时间太长了
if (idleTimes > 30) { if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
@ -212,6 +216,7 @@ public class JobThread extends Thread{
// callback handler info // callback handler info
if (!toStop) { if (!toStop) {
// commonm // commonm
//TODO 填充callBackQueue 队列 用于讲任务执行结果返回给服务端
TriggerCallbackThread.pushCallBack(new HandleCallbackParam( TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(), triggerParam.getLogId(),
triggerParam.getLogDateTime(), triggerParam.getLogDateTime(),

@ -33,7 +33,7 @@ public class TriggerCallbackThread {
} }
/** /**
* job results callback queue * job results callback queue
*/ */
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>(); private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){ public static void pushCallBack(HandleCallbackParam callback){
@ -44,7 +44,7 @@ public class TriggerCallbackThread {
/** /**
* callback thread * callback thread
*/ */
/** TODO codemsg callback /** TODO codemsg
* xxl/api/callback * xxl/api/callback
* log triggerRetryCallbackThread 线30() * log triggerRetryCallbackThread 线30()
*/ */
@ -68,17 +68,19 @@ public class TriggerCallbackThread {
// normal callback // normal callback
while(!toStop){ while(!toStop){
try { try {
//TODO 从队列中取一个 阻塞方法
HandleCallbackParam callback = getInstance().callBackQueue.take(); HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) { if (callback != null) {
// callback list param // callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
//将队列中所有对象全部取出 //TODO 将队列中所有对象全部取出
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback); callbackParamList.add(callback);
// callback, will retry if error // callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) { if (callbackParamList!=null && callbackParamList.size()>0) {
//TODO
doCallback(callbackParamList); doCallback(callbackParamList);
} }
} }
@ -90,6 +92,7 @@ public class TriggerCallbackThread {
} }
// last callback // last callback
//TODO 跳出循环 说明xxljob生命周期结束了 防止有数据还停留在队列中
try { try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
@ -110,7 +113,7 @@ public class TriggerCallbackThread {
triggerCallbackThread.start(); triggerCallbackThread.start();
// retry // TODO 扫描上一个线程 请求异常的数据 重新发送
triggerRetryCallbackThread = new Thread(new Runnable() { triggerRetryCallbackThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -172,8 +175,10 @@ public class TriggerCallbackThread {
//getAdminBizList 所有xxl服务器地址 //getAdminBizList 所有xxl服务器地址
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try { try {
//TODO 调用服务端 callback接口
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
//日志记录
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true; callbackRet = true;
break; break;
@ -185,6 +190,7 @@ public class TriggerCallbackThread {
} }
} }
if (!callbackRet) { if (!callbackRet) {
//TODO 请求服务端异常处理(写进日志文件) 如果部分成功部分失败 是不是重复劳动
appendFailCallbackFile(callbackParamList); appendFailCallbackFile(callbackParamList);
} }
} }
@ -258,6 +264,7 @@ public class TriggerCallbackThread {
List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
//TODO 读取完了 直接删除
callbaclLogFile.delete(); callbaclLogFile.delete();
doCallback(callbackParamList); doCallback(callbackParamList);
} }

Loading…
Cancel
Save