From 73c2256f8100bdd3c14493866f7ac94eb98d99ab Mon Sep 17 00:00:00 2001 From: xieke Date: Tue, 17 Sep 2024 18:39:35 +0800 Subject: [PATCH] =?UTF-8?q?xxl=20=E6=A1=86=E6=9E=B6=20plugin=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 3 +- wanhua-executor-plugins/pom.xml | 23 ++ .../pom.xml | 42 +++ ...ramelessStationV3CollectorApplication.java | 38 +++ .../config/StationV3CollectorJobConfig.java | 94 +++++++ .../jobhandler/StationV3CollectorJob.java | 251 ++++++++++++++++++ .../src/main/resources/log4j.xml | 27 ++ .../resources/xxl-job-executor.properties | 17 ++ 8 files changed, 494 insertions(+), 1 deletion(-) create mode 100644 wanhua-executor-plugins/pom.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties diff --git a/pom.xml b/pom.xml index ad5a2b4e..9d0058ae 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,8 @@ xxl-job-core xxl-job-admin xxl-job-executor-samples - + wanhua-executor-plugins + UTF-8 diff --git a/wanhua-executor-plugins/pom.xml b/wanhua-executor-plugins/pom.xml new file mode 100644 index 00000000..9de2025e --- /dev/null +++ b/wanhua-executor-plugins/pom.xml @@ -0,0 +1,23 @@ + + + + xxl-job + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-executor-plugins + pom + + wanhua-frameless-stationv3-collector-plugin + + + + 8 + 8 + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml new file mode 100644 index 00000000..f5801840 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -0,0 +1,42 @@ + + + + wanhua-executor-plugins + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-frameless-stationv3-collector-plugin + + + 8 + 8 + + + + + + org.slf4j + slf4j-log4j12 + ${slf4j-api.version} + + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + test + + + + + com.xuxueli + xxl-job-core + ${project.parent.version} + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java new file mode 100644 index 00000000..834fdfdc --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java @@ -0,0 +1,38 @@ +package com.wanhua.frameless.stationv3; + +import com.wanhua.frameless.stationv3.config.StationV3CollectorJobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class FramelessStationV3CollectorApplication { + private static Logger logger = LoggerFactory.getLogger(FramelessStationV3CollectorApplication.class); + + public static void main(String[] args) { + + try { + // start + StationV3CollectorJobConfig.getInstance().initXxlJobExecutor(); + + // Blocks until interrupted + while (true) { + try { + TimeUnit.HOURS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + // destroy + StationV3CollectorJobConfig.getInstance().destroyXxlJobExecutor(); + } + + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java new file mode 100644 index 00000000..87a38807 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java @@ -0,0 +1,94 @@ +package com.wanhua.frameless.stationv3.config; + + +import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.wanhua.frameless.stationv3.jobhandler.StationV3CollectorJob; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Properties; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class StationV3CollectorJobConfig { + private static Logger logger = LoggerFactory.getLogger(StationV3CollectorJobConfig.class); + + + private static StationV3CollectorJobConfig instance = new StationV3CollectorJobConfig(); + public static StationV3CollectorJobConfig getInstance() { + return instance; + } + + + private XxlJobSimpleExecutor xxlJobExecutor = null; + + /** + * init + */ + public void initXxlJobExecutor() { + + // load executor prop + Properties xxlJobProp = loadProperties("xxl-job-executor.properties"); + + // init executor + xxlJobExecutor = new XxlJobSimpleExecutor(); + xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses")); + xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken")); + xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname")); + xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address")); + xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip")); + xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port"))); + xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath")); + xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays"))); + + // registry job bean + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new StationV3CollectorJob())); + + // start executor + try { + xxlJobExecutor.start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + /** + * destroy + */ + public void destroyXxlJobExecutor() { + if (xxlJobExecutor != null) { + xxlJobExecutor.destroy(); + } + } + + + public static Properties loadProperties(String propertyFileName) { + InputStreamReader in = null; + try { + ClassLoader loder = Thread.currentThread().getContextClassLoader(); + + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + if (in != null) { + Properties prop = new Properties(); + prop.load(in); + return prop; + } + } catch (IOException e) { + logger.error("load {} error!", propertyFileName); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error("close {} error!", propertyFileName); + } + } + } + return null; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java new file mode 100644 index 00000000..fc66ffa7 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java @@ -0,0 +1,251 @@ +package com.wanhua.frameless.stationv3.jobhandler; + +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class StationV3CollectorJob { + private static Logger logger = LoggerFactory.getLogger(StationV3CollectorJob.class); + + + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("demoJobHandler") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + + for (int i = 0; i < 5; i++) { + XxlJobHelper.log("beat at:" + i); + TimeUnit.SECONDS.sleep(2); + } + // default success + } + + + /** + * 2、分片广播任务 + */ + @XxlJob("shardingJobHandler") + public void shardingJobHandler() throws Exception { + + // 分片参数 + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + // 业务逻辑 + for (int i = 0; i < shardTotal; i++) { + if (i == shardIndex) { + XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); + } else { + XxlJobHelper.log("第 {} 片, 忽略", i); + } + } + + } + + + /** + * 3、命令行任务 + */ + @XxlJob("commandJobHandler") + public void commandJobHandler() throws Exception { + String command = XxlJobHelper.getJobParam(); + int exitValue = -1; + + BufferedReader bufferedReader = null; + try { + // command process + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command(command); + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + //Process process = Runtime.getRuntime().exec(command); + + BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); + bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); + + // command log + String line; + while ((line = bufferedReader.readLine()) != null) { + XxlJobHelper.log(line); + } + + // command exit + process.waitFor(); + exitValue = process.exitValue(); + } catch (Exception e) { + XxlJobHelper.log(e); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + if (exitValue == 0) { + // default success + } else { + XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); + } + + } + + + /** + * 4、跨平台Http任务 + * 参数示例: + * "url: http://www.baidu.com\n" + + * "method: get\n" + + * "data: content\n"; + */ + @XxlJob("httpJobHandler") + public void httpJobHandler() throws Exception { + + // param parse + String param = XxlJobHelper.getJobParam(); + if (param==null || param.trim().length()==0) { + XxlJobHelper.log("param["+ param +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + + String[] httpParams = param.split("\n"); + String url = null; + String method = null; + String data = null; + for (String httpParam: httpParams) { + if (httpParam.startsWith("url:")) { + url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); + } + if (httpParam.startsWith("method:")) { + method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); + } + if (httpParam.startsWith("data:")) { + data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); + } + } + + // param valid + if (url==null || url.trim().length()==0) { + XxlJobHelper.log("url["+ url +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + if (method==null || !Arrays.asList("GET", "POST").contains(method)) { + XxlJobHelper.log("method["+ method +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + boolean isPostMethod = method.equals("POST"); + + // request + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // connection setting + connection.setRequestMethod(method); + connection.setDoOutput(isPostMethod); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(5 * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + // do connection + connection.connect(); + + // data + if (isPostMethod && data!=null && data.trim().length()>0) { + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(data.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + } + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String responseMsg = result.toString(); + + XxlJobHelper.log(responseMsg); + + return; + } catch (Exception e) { + XxlJobHelper.log(e); + + XxlJobHelper.handleFail(); + return; + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + XxlJobHelper.log(e2); + } + } + + } + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml new file mode 100644 index 00000000..896517e2 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties new file mode 100644 index 00000000..d21f35cb --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -0,0 +1,17 @@ +### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" +xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin + +### xxl-job, access token +xxl.job.accessToken=default_token + +### xxl-job executor appname +xxl.job.executor.appname=gsmsv3_collect +### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null +xxl.job.executor.address= +### xxl-job executor server-info +xxl.job.executor.ip= +xxl.job.executor.port=19998 +### xxl-job executor log-path +xxl.job.executor.logpath=logs/xxl-job/jobhandler +### xxl-job executor log-retention-days +xxl.job.executor.logretentiondays=30 \ No newline at end of file