xuxueli 3 years ago
commit a4db25db61

@ -776,7 +776,7 @@ docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_jo
执行器配置,配置内容说明: 执行器配置,配置内容说明:
### 调度中心部署地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册; ### 调度中心部署地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用; ### 执行器通讯TOKEN [选填]:非空时启用;
@ -1514,7 +1514,7 @@ API服务请求参考代码com.xxl.job.adminbiz.AdminBizTest
------ ------
地址格式:{调度中心跟地址}/callback 地址格式:{调度中心根地址}/api/callback
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1542,7 +1542,7 @@ Header
------ ------
地址格式:{调度中心跟地址}/registry 地址格式:{调度中心根地址}/api/registry
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1567,7 +1567,7 @@ Header
------ ------
地址格式:{调度中心跟地址}/registryRemove 地址格式:{调度中心根地址}/api/registryRemove
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1597,7 +1597,7 @@ API服务请求参考代码com.xxl.job.executorbiz.ExecutorBizTest
------ ------
地址格式:{执行器内嵌服务地址}/beat 地址格式:{执行器内嵌服务地址}/beat
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1617,7 +1617,7 @@ Header
------ ------
地址格式:{执行器内嵌服务地址}/idleBeat 地址格式:{执行器内嵌服务地址}/idleBeat
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1640,7 +1640,7 @@ Header
------ ------
地址格式:{执行器内嵌服务地址}/run 地址格式:{执行器内嵌服务地址}/run
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1674,7 +1674,7 @@ Header
------ ------
地址格式:{执行器内嵌服务地址}/kill 地址格式:{执行器内嵌服务地址}/kill
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}
@ -1698,7 +1698,7 @@ Header
------ ------
地址格式:{执行器内嵌服务地址}/log 地址格式:{执行器内嵌服务地址}/log
Header Header
XXL-JOB-ACCESS-TOKEN : {请求令牌} XXL-JOB-ACCESS-TOKEN : {请求令牌}

@ -83,6 +83,7 @@ public class XxlJobExecutor {
// init executor-server // init executor-server
initEmbedServer(address, ip, port, appname, accessToken); initEmbedServer(address, ip, port, appname, accessToken);
} }
public void destroy(){ public void destroy(){
// destroy executor-server // destroy executor-server
stopEmbedServer(); stopEmbedServer();
@ -131,6 +132,7 @@ public class XxlJobExecutor {
} }
} }
} }
public static List<AdminBiz> getAdminBizList(){ public static List<AdminBiz> getAdminBizList(){
return adminBizList; return adminBizList;
} }
@ -251,6 +253,7 @@ public class XxlJobExecutor {
return newJobThread; return newJobThread;
} }
public static JobThread removeJobThread(int jobId, String removeOldReason){ public static JobThread removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId); JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) { if (oldJobThread != null) {
@ -261,9 +264,8 @@ public class XxlJobExecutor {
} }
return null; return null;
} }
public static JobThread loadJobThread(int jobId){ public static JobThread loadJobThread(int jobId){
JobThread jobThread = jobThreadRepository.get(jobId); return jobThreadRepository.get(jobId);
return jobThread;
} }
} }

@ -36,10 +36,8 @@ public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) { public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl(); executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() { thread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
// param // param
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
@ -61,8 +59,6 @@ public class EmbedServer {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
} }
}); });
try { try {
// start server // start server
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
@ -92,11 +88,9 @@ public class EmbedServer {
future.channel().closeFuture().sync(); future.channel().closeFuture().sync();
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (e instanceof InterruptedException) { logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } catch (Exception e) {
} else { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally { } finally {
// stop // stop
try { try {
@ -106,17 +100,15 @@ public class EmbedServer {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
} }
}); });
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start(); thread.start();
} }
public void stop() throws Exception { public void stop() throws Exception {
// destroy server thread // destroy server thread
if (thread!=null && thread.isAlive()) { if (thread != null && thread.isAlive()) {
thread.interrupt(); thread.interrupt();
} }
@ -130,7 +122,7 @@ public class EmbedServer {
/** /**
* netty_http * netty_http
* * <p>
* Copy from : https://github.com/xuxueli/xxl-rpc * Copy from : https://github.com/xuxueli/xxl-rpc
* *
* @author xuxueli 2015-11-24 22:25:15 * @author xuxueli 2015-11-24 22:25:15
@ -141,6 +133,7 @@ public class EmbedServer {
private ExecutorBiz executorBiz; private ExecutorBiz executorBiz;
private String accessToken; private String accessToken;
private ThreadPoolExecutor bizThreadPool; private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) { public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz; this.executorBiz = executorBiz;
this.accessToken = accessToken; this.accessToken = accessToken;
@ -149,7 +142,6 @@ public class EmbedServer {
@Override @Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse // request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8);
@ -175,38 +167,38 @@ public class EmbedServer {
} }
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid // valid
if (HttpMethod.POST != httpMethod) { if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support."); return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
} }
if (uri==null || uri.trim().length()==0) { if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty."); return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
} }
if (accessToken!=null if (accessToken != null
&& accessToken.trim().length()>0 && accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) { && !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong."); return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
} }
// services mapping // services mapping
try { try {
if ("/beat".equals(uri)) { switch (uri) {
return executorBiz.beat(); case "/beat":
} else if ("/idleBeat".equals(uri)) { return executorBiz.beat();
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); case "/idleBeat":
return executorBiz.idleBeat(idleBeatParam); IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
} else if ("/run".equals(uri)) { return executorBiz.idleBeat(idleBeatParam);
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); case "/run":
return executorBiz.run(triggerParam); TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
} else if ("/kill".equals(uri)) { return executorBiz.run(triggerParam);
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); case "/kill":
return executorBiz.kill(killParam); KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
} else if ("/log".equals(uri)) { return executorBiz.kill(killParam);
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); case "/log":
return executorBiz.log(logParam); LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
} else { return executorBiz.log(logParam);
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
@ -261,6 +253,4 @@ public class EmbedServer {
// stop registry // stop registry
ExecutorRegistryThread.getInstance().toStop(); ExecutorRegistryThread.getInstance().toStop();
} }
} }

Loading…
Cancel
Save