From 0b4849bb61f68e7ff5406866df447039ac989731 Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Sat, 26 Aug 2017 12:20:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E6=89=8B=E5=8A=A8?= =?UTF-8?q?=E8=AE=BE=E7=BD=AEIP=E6=97=B6=E5=B0=86=E4=BC=9A=E7=BB=91?= =?UTF-8?q?=E5=AE=9AHost=EF=BC=9B=20=E6=89=A7=E8=A1=8C=E5=99=A8=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E7=BA=BF=E7=A8=8B=E4=BC=98=E5=8C=96=EF=BC=8C=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E9=94=80=E6=AF=81=E5=89=8D=E6=89=B9=E9=87=8F=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E9=98=9F=E5=88=97=E4=B8=AD=E6=89=80=E6=9C=89=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 10 ++- .../src/main/webapp/static/js/index.js | 2 +- .../rpc/netcom/jetty/server/JettyServer.java | 3 + .../core/thread/ExecutorRegistryThread.java | 6 +- .../core/thread/TriggerCallbackThread.java | 80 +++++++++++++------ 5 files changed, 70 insertions(+), 31 deletions(-) diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 4fe69bf8..8dd9c976 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -238,7 +238,7 @@ XXL-JOB是一个轻量级分布式任务调度框架,其核心设计目标是 ### xxl-job admin address list:调度中心部署跟地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调"。 xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin - ### xxl-job executor address:执行器"AppName"和地址信息配置:AppName执行器心跳注册分组依据;地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP。单机部署多个执行器时,注意要配置不同执行器端口; + ### xxl-job executor address:执行器"AppName"和地址信息配置:AppName执行器心跳注册分组依据;地址信息用于"调度中心请求并触发任务"和"执行器注册"。执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP,手动设置IP时将会绑定Host。单机部署多个执行器时,注意要配置不同执行器端口; xxl.job.executor.appname=xxl-job-executor-sample xxl.job.executor.ip= xxl.job.executor.port=9999 @@ -984,6 +984,9 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 1、解决执行器回调URL不支持配置HTTPS时问题; - 2、规范项目目录,方便扩展多执行器; - 3、新增JFinal类型执行器sample示例项目; +- 4、执行器手动设置IP时将会绑定Host; +- 5、项目主页搭建,提供中英文文档; +- 6、执行器回调线程优化,线程销毁前批量回调队列中所有数据; ### TODO LIST - 1、任务权限管理:执行器为粒度分配权限,核心操作校验权限; @@ -994,11 +997,10 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 6、调度任务优先级; - 7、移除quartz依赖,重写调度模块:新增或恢复任务时将下次执行记录插入delayqueue,调度中心集群竞争分布式锁,成功节点批量加载到期delayqueue数据,批量执行。 - 8、springboot 和 docker镜像,并且推送docker镜像到中央仓库,更进一步实现产品开箱即用; -- 9、国际化:调度中心界面 + 官方文档,新增英文版本。 +- 9、国际化:调度中心界面。 - 10、执行器摘除:执行器销毁时,主动通知调度中心并摘除对应执行器节点,提高执行器状态感知的时效性。 - 11、调度中心API服务:支持API方式触发任务执行; -- 12、执行器启动,设定IP的时候,主动绑定IP,避免攻击; -- 13、任务参数类型改为string,进一步方便参数传递; +- 12、任务参数类型改为string,进一步方便参数传递; ## 七、其他 diff --git a/xxl-job-admin/src/main/webapp/static/js/index.js b/xxl-job-admin/src/main/webapp/static/js/index.js index 0f74d16b..fa5e578e 100644 --- a/xxl-job-admin/src/main/webapp/static/js/index.js +++ b/xxl-job-admin/src/main/webapp/static/js/index.js @@ -122,7 +122,7 @@ $(function () { }, series : [ { - name: '访问来源', + name: '分布比例', type: 'pie', radius : '55%', center: ['50%', '60%'], diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java index 0b9621bc..62754f3a 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java @@ -30,6 +30,9 @@ public class JettyServer { // HTTP connector ServerConnector connector = new ServerConnector(server); + if (ip!=null && ip.trim().length()>0) { + connector.setHost(ip); // The network interface this connector binds to as an IP address or a hostname. If null or 0.0.0.0, then bind to all interfaces. + } connector.setPort(port); server.setConnectors(new Connector[]{connector}); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index c3e7eca2..52c070d3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -23,7 +23,7 @@ public class ExecutorRegistryThread extends Thread { } private Thread registryThread; - private boolean toStop = false; + private volatile boolean toStop = false; public void start(final int port, final String ip, final String appName){ // valid @@ -75,6 +75,10 @@ public class ExecutorRegistryThread extends Thread { logger.error(e.getMessage(), e); } } + + // registry remove + + } }); registryThread.setDaemon(true); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 8bb5463e..c5d05775 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -22,15 +22,34 @@ public class TriggerCallbackThread { return instance; } + /** + * job results callback queue + */ private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); + public static void pushCallBack(HandleCallbackParam callback){ + getInstance().callBackQueue.add(callback); + logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); + } + /** + * callback thread + */ private Thread triggerCallbackThread; - private boolean toStop = false; + private volatile boolean toStop = false; public void start() { + + // valid + if (XxlJobExecutor.getAdminBizList() == null) { + logger.warn(">>>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); + return; + } + triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { + + // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); @@ -41,34 +60,27 @@ public class TriggerCallbackThread { int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); - // valid - if (XxlJobExecutor.getAdminBizList()==null) { - logger.warn(">>>>>>>>>>>> xxl-job callback fail, adminAddresses is null, callbackParamList:{}", callbackParamList); - continue; - } - // callback, will retry if error - for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { - try { - ReturnT callbackResult = adminBiz.callback(callbackParamList); - if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { - callbackResult = ReturnT.SUCCESS; - logger.info(">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult}); - break; - } else { - logger.info(">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult}); - } - } catch (Exception e) { - logger.error(">>>>>>>>>>> xxl-job callback error, callbackParamList:{}", callbackParamList, e); - //getInstance().callBackQueue.addAll(callbackParamList); - } + if (callbackParamList!=null && callbackParamList.size()>0) { + doCallback(callbackParamList); } - } } catch (Exception e) { logger.error(e.getMessage(), e); } } + + // last callback + try { + List callbackParamList = new ArrayList(); + int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); + if (callbackParamList!=null && callbackParamList.size()>0) { + doCallback(callbackParamList); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } }); triggerCallbackThread.setDaemon(true); @@ -78,9 +90,27 @@ public class TriggerCallbackThread { toStop = true; } - public static void pushCallBack(HandleCallbackParam callback){ - getInstance().callBackQueue.add(callback); - logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); + /** + * do callback, will retry if error + * @param callbackParamList + */ + private void doCallback(List callbackParamList){ + // callback, will retry if error + for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { + try { + ReturnT callbackResult = adminBiz.callback(callbackParamList); + if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { + callbackResult = ReturnT.SUCCESS; + logger.info(">>>>>>>>>>> xxl-job callback success, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult}); + break; + } else { + logger.info(">>>>>>>>>>> xxl-job callback fail, callbackParamList:{}, callbackResult:{}", new Object[]{callbackParamList, callbackResult}); + } + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job callback error, callbackParamList:{}", callbackParamList, e); + //getInstance().callBackQueue.addAll(callbackParamList); + } + } } }