From 5db77de4aecf1708fd50cd573c6d5ce938bc0eb8 Mon Sep 17 00:00:00 2001 From: xieke Date: Tue, 17 Sep 2024 08:53:05 +0800 Subject: [PATCH 01/12] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6xxl-job?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- xxl-job-admin/src/main/resources/application.properties | 9 +++++---- xxl-job-admin/src/main/resources/logback.xml | 2 +- .../src/main/resources/templates/common/common.macro.ftl | 3 ++- .../src/main/resources/xxl-job-executor.properties | 6 +++--- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index d438057d..ff2454a0 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -1,6 +1,7 @@ ### web -server.port=8080 -server.servlet.context-path=/xxl-job-admin +server.address=10.201.1.101 +server.port=18080 +server.servlet.context-path=/internal/etl/xxl-job-admin ### actuator management.server.servlet.context-path=/actuator @@ -24,9 +25,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://10.201.1.90:3306/common_xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=root_pwd +spring.datasource.password=dbmysql1205 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool diff --git a/xxl-job-admin/src/main/resources/logback.xml b/xxl-job-admin/src/main/resources/logback.xml index d4b08c24..35b07923 100644 --- a/xxl-job-admin/src/main/resources/logback.xml +++ b/xxl-job-admin/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - + diff --git a/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl b/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl index aace849f..89217d2e 100644 --- a/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl +++ b/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl @@ -58,7 +58,8 @@ <#-- common --> diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties b/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties index 9b1ab8a7..0706e953 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties @@ -1,5 +1,5 @@ ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin +xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token @@ -10,8 +10,8 @@ xxl.job.executor.appname=xxl-job-executor-sample xxl.job.executor.address= ### xxl-job executor server-info xxl.job.executor.ip= -xxl.job.executor.port=9998 +xxl.job.executor.port=19998 ### xxl-job executor log-path -xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler +xxl.job.executor.logpath=logs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30 \ No newline at end of file From 73c2256f8100bdd3c14493866f7ac94eb98d99ab Mon Sep 17 00:00:00 2001 From: xieke Date: Tue, 17 Sep 2024 18:39:35 +0800 Subject: [PATCH 02/12] =?UTF-8?q?xxl=20=E6=A1=86=E6=9E=B6=20plugin?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 3 +- wanhua-executor-plugins/pom.xml | 23 ++ .../pom.xml | 42 +++ ...ramelessStationV3CollectorApplication.java | 38 +++ .../config/StationV3CollectorJobConfig.java | 94 +++++++ .../jobhandler/StationV3CollectorJob.java | 251 ++++++++++++++++++ .../src/main/resources/log4j.xml | 27 ++ .../resources/xxl-job-executor.properties | 17 ++ 8 files changed, 494 insertions(+), 1 deletion(-) create mode 100644 wanhua-executor-plugins/pom.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties diff --git a/pom.xml b/pom.xml index ad5a2b4e..9d0058ae 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,8 @@ xxl-job-core xxl-job-admin xxl-job-executor-samples - + wanhua-executor-plugins + UTF-8 diff --git a/wanhua-executor-plugins/pom.xml b/wanhua-executor-plugins/pom.xml new file mode 100644 index 00000000..9de2025e --- /dev/null +++ b/wanhua-executor-plugins/pom.xml @@ -0,0 +1,23 @@ + + + + xxl-job + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-executor-plugins + pom + + wanhua-frameless-stationv3-collector-plugin + + + + 8 + 8 + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml new file mode 100644 index 00000000..f5801840 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -0,0 +1,42 @@ + + + + wanhua-executor-plugins + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-frameless-stationv3-collector-plugin + + + 8 + 8 + + + + + + org.slf4j + slf4j-log4j12 + ${slf4j-api.version} + + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + test + + + + + com.xuxueli + xxl-job-core + ${project.parent.version} + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java new file mode 100644 index 00000000..834fdfdc --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java @@ -0,0 +1,38 @@ +package com.wanhua.frameless.stationv3; + +import com.wanhua.frameless.stationv3.config.StationV3CollectorJobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class FramelessStationV3CollectorApplication { + private static Logger logger = LoggerFactory.getLogger(FramelessStationV3CollectorApplication.class); + + public static void main(String[] args) { + + try { + // start + StationV3CollectorJobConfig.getInstance().initXxlJobExecutor(); + + // Blocks until interrupted + while (true) { + try { + TimeUnit.HOURS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + // destroy + StationV3CollectorJobConfig.getInstance().destroyXxlJobExecutor(); + } + + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java new file mode 100644 index 00000000..87a38807 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java @@ -0,0 +1,94 @@ +package com.wanhua.frameless.stationv3.config; + + +import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.wanhua.frameless.stationv3.jobhandler.StationV3CollectorJob; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Properties; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class StationV3CollectorJobConfig { + private static Logger logger = LoggerFactory.getLogger(StationV3CollectorJobConfig.class); + + + private static StationV3CollectorJobConfig instance = new StationV3CollectorJobConfig(); + public static StationV3CollectorJobConfig getInstance() { + return instance; + } + + + private XxlJobSimpleExecutor xxlJobExecutor = null; + + /** + * init + */ + public void initXxlJobExecutor() { + + // load executor prop + Properties xxlJobProp = loadProperties("xxl-job-executor.properties"); + + // init executor + xxlJobExecutor = new XxlJobSimpleExecutor(); + xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses")); + xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken")); + xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname")); + xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address")); + xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip")); + xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port"))); + xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath")); + xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays"))); + + // registry job bean + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new StationV3CollectorJob())); + + // start executor + try { + xxlJobExecutor.start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + /** + * destroy + */ + public void destroyXxlJobExecutor() { + if (xxlJobExecutor != null) { + xxlJobExecutor.destroy(); + } + } + + + public static Properties loadProperties(String propertyFileName) { + InputStreamReader in = null; + try { + ClassLoader loder = Thread.currentThread().getContextClassLoader(); + + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + if (in != null) { + Properties prop = new Properties(); + prop.load(in); + return prop; + } + } catch (IOException e) { + logger.error("load {} error!", propertyFileName); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error("close {} error!", propertyFileName); + } + } + } + return null; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java new file mode 100644 index 00000000..fc66ffa7 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java @@ -0,0 +1,251 @@ +package com.wanhua.frameless.stationv3.jobhandler; + +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class StationV3CollectorJob { + private static Logger logger = LoggerFactory.getLogger(StationV3CollectorJob.class); + + + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("demoJobHandler") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + + for (int i = 0; i < 5; i++) { + XxlJobHelper.log("beat at:" + i); + TimeUnit.SECONDS.sleep(2); + } + // default success + } + + + /** + * 2、分片广播任务 + */ + @XxlJob("shardingJobHandler") + public void shardingJobHandler() throws Exception { + + // 分片参数 + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + // 业务逻辑 + for (int i = 0; i < shardTotal; i++) { + if (i == shardIndex) { + XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); + } else { + XxlJobHelper.log("第 {} 片, 忽略", i); + } + } + + } + + + /** + * 3、命令行任务 + */ + @XxlJob("commandJobHandler") + public void commandJobHandler() throws Exception { + String command = XxlJobHelper.getJobParam(); + int exitValue = -1; + + BufferedReader bufferedReader = null; + try { + // command process + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command(command); + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + //Process process = Runtime.getRuntime().exec(command); + + BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); + bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); + + // command log + String line; + while ((line = bufferedReader.readLine()) != null) { + XxlJobHelper.log(line); + } + + // command exit + process.waitFor(); + exitValue = process.exitValue(); + } catch (Exception e) { + XxlJobHelper.log(e); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + if (exitValue == 0) { + // default success + } else { + XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); + } + + } + + + /** + * 4、跨平台Http任务 + * 参数示例: + * "url: http://www.baidu.com\n" + + * "method: get\n" + + * "data: content\n"; + */ + @XxlJob("httpJobHandler") + public void httpJobHandler() throws Exception { + + // param parse + String param = XxlJobHelper.getJobParam(); + if (param==null || param.trim().length()==0) { + XxlJobHelper.log("param["+ param +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + + String[] httpParams = param.split("\n"); + String url = null; + String method = null; + String data = null; + for (String httpParam: httpParams) { + if (httpParam.startsWith("url:")) { + url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); + } + if (httpParam.startsWith("method:")) { + method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); + } + if (httpParam.startsWith("data:")) { + data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); + } + } + + // param valid + if (url==null || url.trim().length()==0) { + XxlJobHelper.log("url["+ url +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + if (method==null || !Arrays.asList("GET", "POST").contains(method)) { + XxlJobHelper.log("method["+ method +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + boolean isPostMethod = method.equals("POST"); + + // request + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // connection setting + connection.setRequestMethod(method); + connection.setDoOutput(isPostMethod); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(5 * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + // do connection + connection.connect(); + + // data + if (isPostMethod && data!=null && data.trim().length()>0) { + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(data.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + } + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String responseMsg = result.toString(); + + XxlJobHelper.log(responseMsg); + + return; + } catch (Exception e) { + XxlJobHelper.log(e); + + XxlJobHelper.handleFail(); + return; + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + XxlJobHelper.log(e2); + } + } + + } + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml new file mode 100644 index 00000000..896517e2 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties new file mode 100644 index 00000000..d21f35cb --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -0,0 +1,17 @@ +### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" +xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin + +### xxl-job, access token +xxl.job.accessToken=default_token + +### xxl-job executor appname +xxl.job.executor.appname=gsmsv3_collect +### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null +xxl.job.executor.address= +### xxl-job executor server-info +xxl.job.executor.ip= +xxl.job.executor.port=19998 +### xxl-job executor log-path +xxl.job.executor.logpath=logs/xxl-job/jobhandler +### xxl-job executor log-retention-days +xxl.job.executor.logretentiondays=30 \ No newline at end of file From c08131fff8f4081a1c9ec51b7bcee532c3435fa4 Mon Sep 17 00:00:00 2001 From: xieke Date: Tue, 17 Sep 2024 22:03:37 +0800 Subject: [PATCH 03/12] fix pom and confgiuration in executors --- .../pom.xml | 45 +++++++++++++++++++ .../src/main/assembly/assembly.xml | 45 +++++++++++++++++++ .../src/main/resources/log4j.xml | 2 +- .../resources/xxl-job-executor.properties | 4 +- 4 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml index f5801840..b929b2a5 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -39,4 +39,49 @@ + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + false + + + true + + lib/ + + com.wanhua.frameless.stationv3.FramelessStationV3CollectorApplication + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + src/main/assembly/assembly.xml + + + + + make-assembly + package + + single + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml new file mode 100644 index 00000000..9d27028d --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,45 @@ + + release + + zip + + false + + + + + + + + + + + + + + + + + + false + lib + false + + + + + + ${project.basedir}/src/main/resources + conf + + + ${project.build.directory} + + + *.jar + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml index 896517e2..da3a9721 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml @@ -10,7 +10,7 @@ - + diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties index d21f35cb..26f78283 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -1,11 +1,11 @@ ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin +xxl.job.admin.addresses=http://10.201.1.101:18080/internal/etl/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token ### xxl-job executor appname -xxl.job.executor.appname=gsmsv3_collect +xxl.job.executor.appname=gsmsv3-collect ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null xxl.job.executor.address= ### xxl-job executor server-info From 2f2df724999f132f9fcf6b37f6fd0cea94564041 Mon Sep 17 00:00:00 2001 From: xieke Date: Wed, 18 Sep 2024 19:03:53 +0800 Subject: [PATCH 04/12] =?UTF-8?q?=E6=94=AF=E6=8C=81nginx=E5=90=8E=E9=83=A8?= =?UTF-8?q?=E7=BD=B2executor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/xxl-job-executor.properties | 4 ++-- .../main/resources/templates/common/common.macro.ftl | 2 +- .../com/xxl/job/core/biz/client/ExecutorBizClient.java | 10 +++++----- .../main/java/com/xxl/job/core/server/EmbedServer.java | 7 ++++++- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties index 26f78283..eed99cda 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -1,5 +1,5 @@ ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://10.201.1.101:18080/internal/etl/xxl-job-admin +xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token @@ -7,7 +7,7 @@ xxl.job.accessToken=default_token ### xxl-job executor appname xxl.job.executor.appname=gsmsv3-collect ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address= +xxl.job.executor.address=https://10.201.1.101:8082 ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=19998 diff --git a/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl b/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl index 89217d2e..a52c7a4f 100644 --- a/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl +++ b/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl @@ -59,7 +59,7 @@ diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java index 9f594309..a8ea80a9 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java @@ -30,27 +30,27 @@ public class ExecutorBizClient implements ExecutorBiz { @Override public ReturnT beat() { - return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class); + return XxlJobRemotingUtil.postBody(addressUrl+"internal/etl/wanhua/executor/beat", accessToken, timeout, "", String.class); } @Override public ReturnT idleBeat(IdleBeatParam idleBeatParam){ - return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class); + return XxlJobRemotingUtil.postBody(addressUrl+"internal/etl/wanhua/executor/idleBeat", accessToken, timeout, idleBeatParam, String.class); } @Override public ReturnT run(TriggerParam triggerParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); + return XxlJobRemotingUtil.postBody(addressUrl + "internal/etl/wanhua/executor/run", accessToken, timeout, triggerParam, String.class); } @Override public ReturnT kill(KillParam killParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class); + return XxlJobRemotingUtil.postBody(addressUrl + "internal/etl/wanhua/executor/kill", accessToken, timeout, killParam, String.class); } @Override public ReturnT log(LogParam logParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class); + return XxlJobRemotingUtil.postBody(addressUrl + "internal/etl/wanhua/executor/log", accessToken, timeout, logParam, LogResult.class); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 540e0ea2..89f03693 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -145,7 +145,12 @@ public class EmbedServer { // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8); - String uri = msg.uri(); + String fullUri = msg.uri(); + if (fullUri.startsWith("/internal/etl/wanhua/executor")) { + // 去掉前面的这一段 + fullUri = fullUri.substring("/internal/etl/wanhua/executor".length()); + } + String uri = fullUri; HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); From 8b34279deea241025e328ae24355bcbc93e9fba7 Mon Sep 17 00:00:00 2001 From: Josez <1123010482@qq.com> Date: Fri, 20 Sep 2024 11:23:57 +0800 Subject: [PATCH 05/12] =?UTF-8?q?lng=E6=97=A5=E6=8A=A5=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=8B=89=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pom.xml | 16 +- .../config/StationV3CollectorJobConfig.java | 3 + .../stationv3/jobhandler/LngCollectorJob.java | 159 ++++++++++++++++++ 3 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml index b929b2a5..31cb9085 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -20,7 +20,7 @@ org.slf4j - slf4j-log4j12 + slf4j-reload4j ${slf4j-api.version} @@ -38,6 +38,18 @@ ${project.parent.version} + + com.wanhua + stationv3-http-data-collector + 1.0-SNAPSHOT + + + + com.wanhua + stationv3-base-dao-service + 1.0-SNAPSHOT + + @@ -84,4 +96,4 @@ - \ No newline at end of file + diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java index 87a38807..31048816 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java @@ -1,6 +1,7 @@ package com.wanhua.frameless.stationv3.config; +import com.wanhua.frameless.stationv3.jobhandler.LngCollectorJob; import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,8 @@ public class StationV3CollectorJobConfig { // registry job bean xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new StationV3CollectorJob())); + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new LngCollectorJob())); + // start executor try { xxlJobExecutor.start(); diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java new file mode 100644 index 00000000..4f4f359b --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java @@ -0,0 +1,159 @@ +package com.wanhua.frameless.stationv3.jobhandler; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.wanhua.lng.daySale.model.SaleDaily; +import com.wanhua.lng.daySale.model.SaleDailyInfo; +import com.wanhua.lng.daySale.service.impl.SaleDailyInfoServiceImpl; +import com.wanhua.lng.daySale.service.impl.SaleDailyServiceImpl; +import com.wanhua.stationservice.baseDaoService.service.impl.PumpServiceImpl; +import com.wanhua.stationservice.baseutils.security.CommonBasedAESUtil; +import com.wanhua.stationv3.data.collector.CollectorTest; +import com.wanhua.stationv3.data.collector.LngCollector; +import com.wanhua.stationv3.data.collector.TokenUtil; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import io.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class LngCollectorJob { + private static Logger logger = LoggerFactory.getLogger(LngCollectorJob.class); + + SaleDailyServiceImpl dailyServiceImpl = new SaleDailyServiceImpl(); + SaleDailyInfoServiceImpl dailyInfoServiceImpl = new SaleDailyInfoServiceImpl(); + + + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("getSaleDaily") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("XXL-JOB,begin start "); + + String param = XxlJobHelper.getJobParam(); + Map params = JSONObject.parseObject(param,Map.class); + //{"time":"2024-01-01/2024-01-03","code":"1-A6201-C001-S085,1-A6201-C001-S084"} + LocalDate yesterday = LocalDate.now().minusDays(1); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + String formattedDate = yesterday.format(formatter); + + String code = params.get("code"); + + String fullToken = TokenUtil.getTokenFromServer("xieke"); + XxlJobHelper.log("token: "+ fullToken); + String secondsFormat = fullToken.substring(0,15); + Long sinceSeconds = Long.parseLong(secondsFormat); + XxlJobHelper.log("since seconds: "+ sinceSeconds); + String tokenFormat = fullToken.substring(15); + String newToken = CommonBasedAESUtil.decrypt("12345678900abcde", "78945612300abcde", tokenFormat); + newToken = newToken.substring(0, newToken.indexOf("SALT1234567890")); + XxlJobHelper.log("realtoken "+ newToken); + String token = newToken; + + + if(!StringUtil.isNullOrEmpty(params.get("time"))){ + String times = params.get("time"); + String start = times.split("/")[0]; + String end = times.split("/")[1]; + + + }else{ + String[] codes = code.split(","); + for (String codeParam : codes){ + XxlJobHelper.log("start get data:"+codeParam+",time:"+formattedDate); + Map dataMap = LngCollector.getTankGunInfo(codeParam,formattedDate,token); + + String businessDay = dataMap.get("businessDay").toString(); + + // 期初库存、本期进货、加气机发出量、数量、单价、本期损溢处理、期末库存 + JSONArray gasSoldList =(JSONArray) dataMap.get("gasSoldList"); + + + + if(gasSoldList.size()>0){ + JSONObject sold = gasSoldList.getJSONObject(0); + SaleDaily daily = new SaleDaily(); + daily.setBusinessDay(businessDay); + daily.setStationCode(code); + daily.setStartVolume(sold.get("startVolume").toString()); + daily.setEndVolume(sold.get("endVolume").toString()); + daily.setPurchaseVolume(sold.get("purchaseVolume").toString()); + daily.setIssuedVolume(sold.get("issuedVolume").toString()); + daily.setProfitLossVolume(sold.get("profitLossVolume").toString()); + + JSONArray gasSoldItemList = sold.getJSONArray("gasSoldItemList"); + if(gasSoldItemList.size()>0){ + JSONObject gasSold = gasSoldItemList.getJSONObject(0); + daily.setUnitPrice(gasSold.get("unitPrice").toString()); + daily.setSalesVolume(gasSold.get("salesVolume").toString()); + } + + dailyServiceImpl.saveOne(daily); + } + + + //不同支付类型销售数据统计 + JSONArray gasPaymentList =(JSONArray) dataMap.get("gasPaymentList"); + if(gasSoldList.size()>0){ +// for(Object info : gasPaymentList) { +// JSONObject sold = (JSONObject) JSONObject.toJSON(info); +// SaleDailyInfo daily = new SaleDailyInfo(); +// daily.setBusinessDay(businessDay); +// daily.setStationCode(code); +// daily.setUnitPrice(sold.get("unitPrice")); +// daily.setEndVolume(sold.get("endVolume").toString()); +// daily.setPurchaseVolume(sold.get("purchaseVolume").toString()); +// daily.setIssuedVolume(sold.get("issuedVolume").toString()); +// daily.setProfitLossVolume(sold.get("profitLossVolume").toString()); +// +// JSONArray gasSoldItemList = sold.getJSONArray("gasSoldItemList"); +// if (gasSoldItemList.size() > 0) { +// JSONObject gasSold = gasSoldItemList.getJSONObject(0); +// daily.setUnitPrice(gasSold.get("unitPrice").toString()); +// daily.setSalesVolume(gasSold.get("salesVolume").toString()); +// } +// +// dailyInfoServiceImpl.saveOne(daily); +// } + } + XxlJobHelper.log("start get data:"+codeParam+" end"); + } + } + + + } + + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} From 60add07dae85686870e604a7142f452b2d337999 Mon Sep 17 00:00:00 2001 From: xieke Date: Fri, 20 Sep 2024 13:45:30 +0800 Subject: [PATCH 06/12] =?UTF-8?q?exclude=E6=8E=89=E5=BC=95=E5=85=A5?= =?UTF-8?q?=E7=9A=84slf4j-api,station-common=E4=B8=AD=E7=9A=84=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E6=98=AF1.7=E4=B8=8Exxl=E4=B8=AD=E7=9A=84=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E6=98=AF2.03?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml index 31cb9085..9247cff6 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -42,12 +42,24 @@ com.wanhua stationv3-http-data-collector 1.0-SNAPSHOT + + + org.slf4j + slf4j-api + + com.wanhua stationv3-base-dao-service 1.0-SNAPSHOT + + + org.slf4j + slf4j-api + + From 52a74f362b2eb769a8f601496c6b87494c26dacc Mon Sep 17 00:00:00 2001 From: xieke Date: Mon, 23 Sep 2024 16:49:11 +0800 Subject: [PATCH 07/12] fix pom --- xxl-job-core/pom.xml | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml index a363344d..df93c522 100644 --- a/xxl-job-core/pom.xml +++ b/xxl-job-core/pom.xml @@ -7,7 +7,7 @@ 2.4.2-SNAPSHOT xxl-job-core - jar + pom ${project.artifactId} A distributed task scheduling framework. @@ -60,5 +60,16 @@ - + + + fdrepo + Release + https://public.hidotcom.cn/libabc/repository/fdrepo/ + + + fdrepo + Snapshot + https://public.hidotcom.cn/libabc/repository/fdrepo/ + + \ No newline at end of file From eb2ed78669ce89a5b11b373d499ece24e1fda514 Mon Sep 17 00:00:00 2001 From: xieke Date: Thu, 26 Sep 2024 09:05:52 +0800 Subject: [PATCH 08/12] qp plugin init --- .../src/main/assembly/assembly.xml | 2 +- .../stationv3/config/StationV3CollectorJobConfig.java | 4 +--- .../src/main/resources/xxl-job-executor.properties | 6 +++--- .../src/main/resources/templates/common/common.macro.ftl | 3 +-- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml index 9d27028d..9fd11c53 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml @@ -32,7 +32,7 @@ ${project.basedir}/src/main/resources - conf + config ${project.build.directory} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java index 31048816..ec693006 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java @@ -47,9 +47,7 @@ public class StationV3CollectorJobConfig { xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays"))); // registry job bean - xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new StationV3CollectorJob())); - - xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new LngCollectorJob())); + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new StationV3CollectorJob(), new LngCollectorJob())); // start executor try { diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties index eed99cda..d5670185 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -1,13 +1,13 @@ ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin +xxl.job.admin.addresses=http://172.22.62.105/internal/etl/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token ### xxl-job executor appname -xxl.job.executor.appname=gsmsv3-collect +xxl.job.executor.appname=qpsy-interbase2 ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address=https://10.201.1.101:8082 +xxl.job.executor.address=http://172.22.62.105 ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=19998 diff --git a/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl b/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl index a52c7a4f..aace849f 100644 --- a/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl +++ b/xxl-job-admin/src/main/resources/templates/common/common.macro.ftl @@ -58,8 +58,7 @@ <#-- common --> From 9f5257ea59e1553862492bed56db0cdbecc492b1 Mon Sep 17 00:00:00 2001 From: xieke Date: Thu, 26 Sep 2024 09:34:33 +0800 Subject: [PATCH 09/12] =?UTF-8?q?=E5=A3=B3=E7=89=8C=E5=8F=96=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=80=BB=E8=BE=91=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wanhua-executor-plugins/pom.xml | 1 + .../pom.xml | 100 +++++++ .../src/main/assembly/assembly.xml | 45 ++++ ...melessQpStationV2CollectorApplication.java | 38 +++ .../config/QpCollectorJobConfig.java | 94 +++++++ .../jobhandler/QpStationV2CollectorJob.java | 251 ++++++++++++++++++ .../src/main/resources/log4j.xml | 27 ++ .../resources/xxl-job-executor.properties | 17 ++ 8 files changed, 573 insertions(+) create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties diff --git a/wanhua-executor-plugins/pom.xml b/wanhua-executor-plugins/pom.xml index 9de2025e..ccce2347 100644 --- a/wanhua-executor-plugins/pom.xml +++ b/wanhua-executor-plugins/pom.xml @@ -13,6 +13,7 @@ pom wanhua-frameless-stationv3-collector-plugin + wanhua-frameless-qp-stationv2-collector-plugin diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml new file mode 100644 index 00000000..19974e94 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml @@ -0,0 +1,100 @@ + + + + wanhua-executor-plugins + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-frameless-qp-stationv2-collector-plugin + + + 8 + 8 + + + + + org.slf4j + slf4j-reload4j + ${slf4j-api.version} + + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + test + + + + + com.xuxueli + xxl-job-core + ${project.parent.version} + + + + + com.wanhua + stationv3-base-dao-service + 1.0-QP-SNAPSHOT + + + org.slf4j + slf4j-api + + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + false + + + true + + lib/ + + com.wanhua.frameless.stationv3.FramelessStationV3CollectorApplication + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + src/main/assembly/assembly.xml + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml new file mode 100644 index 00000000..9fd11c53 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,45 @@ + + release + + zip + + false + + + + + + + + + + + + + + + + + + false + lib + false + + + + + + ${project.basedir}/src/main/resources + config + + + ${project.build.directory} + + + *.jar + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java new file mode 100644 index 00000000..066765fe --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java @@ -0,0 +1,38 @@ +package com.wanhua.frameless.qp.stationv2; + +import com.wanhua.frameless.qp.stationv2.config.QpCollectorJobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class FramelessQpStationV2CollectorApplication { + private static Logger logger = LoggerFactory.getLogger(FramelessQpStationV2CollectorApplication.class); + + public static void main(String[] args) { + + try { + // start + QpCollectorJobConfig.getInstance().initXxlJobExecutor(); + + // Blocks until interrupted + while (true) { + try { + TimeUnit.HOURS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + // destroy + QpCollectorJobConfig.getInstance().destroyXxlJobExecutor(); + } + + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java new file mode 100644 index 00000000..fd3a8ba8 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java @@ -0,0 +1,94 @@ +package com.wanhua.frameless.qp.stationv2.config; + + +import com.wanhua.frameless.qp.stationv2.jobhandler.QpStationV2CollectorJob; +import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Properties; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class QpCollectorJobConfig { + private static Logger logger = LoggerFactory.getLogger(QpCollectorJobConfig.class); + + + private static QpCollectorJobConfig instance = new QpCollectorJobConfig(); + public static QpCollectorJobConfig getInstance() { + return instance; + } + + + private XxlJobSimpleExecutor xxlJobExecutor = null; + + /** + * init + */ + public void initXxlJobExecutor() { + + // load executor prop + Properties xxlJobProp = loadProperties("xxl-job-executor.properties"); + + // init executor + xxlJobExecutor = new XxlJobSimpleExecutor(); + xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses")); + xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken")); + xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname")); + xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address")); + xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip")); + xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port"))); + xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath")); + xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays"))); + + // registry job bean + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new QpStationV2CollectorJob())); + + // start executor + try { + xxlJobExecutor.start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + /** + * destroy + */ + public void destroyXxlJobExecutor() { + if (xxlJobExecutor != null) { + xxlJobExecutor.destroy(); + } + } + + + public static Properties loadProperties(String propertyFileName) { + InputStreamReader in = null; + try { + ClassLoader loder = Thread.currentThread().getContextClassLoader(); + + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + if (in != null) { + Properties prop = new Properties(); + prop.load(in); + return prop; + } + } catch (IOException e) { + logger.error("load {} error!", propertyFileName); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error("close {} error!", propertyFileName); + } + } + } + return null; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java new file mode 100644 index 00000000..bbd1942d --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java @@ -0,0 +1,251 @@ +package com.wanhua.frameless.qp.stationv2.jobhandler; + +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class QpStationV2CollectorJob { + private static Logger logger = LoggerFactory.getLogger(QpStationV2CollectorJob.class); + + + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("demoJobHandler") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + + for (int i = 0; i < 5; i++) { + XxlJobHelper.log("beat at:" + i); + TimeUnit.SECONDS.sleep(2); + } + // default success + } + + + /** + * 2、分片广播任务 + */ + @XxlJob("shardingJobHandler") + public void shardingJobHandler() throws Exception { + + // 分片参数 + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + // 业务逻辑 + for (int i = 0; i < shardTotal; i++) { + if (i == shardIndex) { + XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); + } else { + XxlJobHelper.log("第 {} 片, 忽略", i); + } + } + + } + + + /** + * 3、命令行任务 + */ + @XxlJob("commandJobHandler") + public void commandJobHandler() throws Exception { + String command = XxlJobHelper.getJobParam(); + int exitValue = -1; + + BufferedReader bufferedReader = null; + try { + // command process + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command(command); + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + //Process process = Runtime.getRuntime().exec(command); + + BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); + bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); + + // command log + String line; + while ((line = bufferedReader.readLine()) != null) { + XxlJobHelper.log(line); + } + + // command exit + process.waitFor(); + exitValue = process.exitValue(); + } catch (Exception e) { + XxlJobHelper.log(e); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + if (exitValue == 0) { + // default success + } else { + XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); + } + + } + + + /** + * 4、跨平台Http任务 + * 参数示例: + * "url: http://www.baidu.com\n" + + * "method: get\n" + + * "data: content\n"; + */ + @XxlJob("httpJobHandler") + public void httpJobHandler() throws Exception { + + // param parse + String param = XxlJobHelper.getJobParam(); + if (param==null || param.trim().length()==0) { + XxlJobHelper.log("param["+ param +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + + String[] httpParams = param.split("\n"); + String url = null; + String method = null; + String data = null; + for (String httpParam: httpParams) { + if (httpParam.startsWith("url:")) { + url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); + } + if (httpParam.startsWith("method:")) { + method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); + } + if (httpParam.startsWith("data:")) { + data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); + } + } + + // param valid + if (url==null || url.trim().length()==0) { + XxlJobHelper.log("url["+ url +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + if (method==null || !Arrays.asList("GET", "POST").contains(method)) { + XxlJobHelper.log("method["+ method +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + boolean isPostMethod = method.equals("POST"); + + // request + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // connection setting + connection.setRequestMethod(method); + connection.setDoOutput(isPostMethod); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(5 * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + // do connection + connection.connect(); + + // data + if (isPostMethod && data!=null && data.trim().length()>0) { + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(data.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + } + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String responseMsg = result.toString(); + + XxlJobHelper.log(responseMsg); + + return; + } catch (Exception e) { + XxlJobHelper.log(e); + + XxlJobHelper.handleFail(); + return; + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + XxlJobHelper.log(e2); + } + } + + } + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml new file mode 100644 index 00000000..da3a9721 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties new file mode 100644 index 00000000..d5670185 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -0,0 +1,17 @@ +### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" +xxl.job.admin.addresses=http://172.22.62.105/internal/etl/xxl-job-admin + +### xxl-job, access token +xxl.job.accessToken=default_token + +### xxl-job executor appname +xxl.job.executor.appname=qpsy-interbase2 +### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null +xxl.job.executor.address=http://172.22.62.105 +### xxl-job executor server-info +xxl.job.executor.ip= +xxl.job.executor.port=19998 +### xxl-job executor log-path +xxl.job.executor.logpath=logs/xxl-job/jobhandler +### xxl-job executor log-retention-days +xxl.job.executor.logretentiondays=30 \ No newline at end of file From 05aba0d74b6324a7d7b5e276ab898617334de021 Mon Sep 17 00:00:00 2001 From: Josez <1123010482@qq.com> Date: Thu, 26 Sep 2024 09:44:09 +0800 Subject: [PATCH 10/12] =?UTF-8?q?=E6=97=A5=E9=94=80=E5=94=AE=E6=8A=A5?= =?UTF-8?q?=E8=A1=A8=E6=8B=89=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pom.xml | 6 + .../stationv3/jobhandler/LngCollectorJob.java | 147 ++++++++++++------ xxl-job-core/pom.xml | 4 +- 3 files changed, 105 insertions(+), 52 deletions(-) diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml index 9247cff6..14a96fb6 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -23,6 +23,11 @@ slf4j-reload4j ${slf4j-api.version} + + org.slf4j + slf4j-api + ${slf4j-api.version} + org.junit.jupiter @@ -70,6 +75,7 @@ org.apache.maven.plugins maven-jar-plugin + 3.4.2 diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java index 4f4f359b..0b6e1bb3 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java @@ -6,6 +6,7 @@ import com.wanhua.lng.daySale.model.SaleDaily; import com.wanhua.lng.daySale.model.SaleDailyInfo; import com.wanhua.lng.daySale.service.impl.SaleDailyInfoServiceImpl; import com.wanhua.lng.daySale.service.impl.SaleDailyServiceImpl; +import com.wanhua.lng.daySale.utils.PaymentMethod; import com.wanhua.stationservice.baseDaoService.service.impl.PumpServiceImpl; import com.wanhua.stationservice.baseutils.security.CommonBasedAESUtil; import com.wanhua.stationv3.data.collector.CollectorTest; @@ -19,6 +20,8 @@ import org.slf4j.LoggerFactory; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -73,71 +76,115 @@ public class LngCollectorJob { String start = times.split("/")[0]; String end = times.split("/")[1]; + List timeList = getDatesBetween(start,end); + for(String time : timeList){ + String[] codes = code.split(","); + for (String codeParam : codes){ + getInfo(codeParam,time,token); + } + } }else{ String[] codes = code.split(","); for (String codeParam : codes){ - XxlJobHelper.log("start get data:"+codeParam+",time:"+formattedDate); - Map dataMap = LngCollector.getTankGunInfo(codeParam,formattedDate,token); + getInfo(codeParam,formattedDate,token); + } + } + } - String businessDay = dataMap.get("businessDay").toString(); - // 期初库存、本期进货、加气机发出量、数量、单价、本期损溢处理、期末库存 - JSONArray gasSoldList =(JSONArray) dataMap.get("gasSoldList"); + private static List getDatesBetween(String startDateStr, String endDateStr) { + // 定义日期格式 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + // 将字符串转换为 LocalDate + LocalDate startDate = LocalDate.parse(startDateStr, formatter); + LocalDate endDate = LocalDate.parse(endDateStr, formatter); - if(gasSoldList.size()>0){ - JSONObject sold = gasSoldList.getJSONObject(0); - SaleDaily daily = new SaleDaily(); - daily.setBusinessDay(businessDay); - daily.setStationCode(code); - daily.setStartVolume(sold.get("startVolume").toString()); - daily.setEndVolume(sold.get("endVolume").toString()); - daily.setPurchaseVolume(sold.get("purchaseVolume").toString()); - daily.setIssuedVolume(sold.get("issuedVolume").toString()); - daily.setProfitLossVolume(sold.get("profitLossVolume").toString()); - - JSONArray gasSoldItemList = sold.getJSONArray("gasSoldItemList"); - if(gasSoldItemList.size()>0){ - JSONObject gasSold = gasSoldItemList.getJSONObject(0); - daily.setUnitPrice(gasSold.get("unitPrice").toString()); - daily.setSalesVolume(gasSold.get("salesVolume").toString()); - } - - dailyServiceImpl.saveOne(daily); - } + List datesInRange = new ArrayList<>(); + // 使用 while 循环获取范围内的所有日期 + while (!startDate.isAfter(endDate)) { + // 将每个日期格式化为字符串并添加到列表中 + datesInRange.add(startDate.format(formatter)); + // 日期递增一天 + startDate = startDate.plusDays(1); + } + return datesInRange; + } - //不同支付类型销售数据统计 - JSONArray gasPaymentList =(JSONArray) dataMap.get("gasPaymentList"); - if(gasSoldList.size()>0){ -// for(Object info : gasPaymentList) { -// JSONObject sold = (JSONObject) JSONObject.toJSON(info); -// SaleDailyInfo daily = new SaleDailyInfo(); -// daily.setBusinessDay(businessDay); -// daily.setStationCode(code); -// daily.setUnitPrice(sold.get("unitPrice")); -// daily.setEndVolume(sold.get("endVolume").toString()); -// daily.setPurchaseVolume(sold.get("purchaseVolume").toString()); -// daily.setIssuedVolume(sold.get("issuedVolume").toString()); -// daily.setProfitLossVolume(sold.get("profitLossVolume").toString()); -// -// JSONArray gasSoldItemList = sold.getJSONArray("gasSoldItemList"); -// if (gasSoldItemList.size() > 0) { -// JSONObject gasSold = gasSoldItemList.getJSONObject(0); -// daily.setUnitPrice(gasSold.get("unitPrice").toString()); -// daily.setSalesVolume(gasSold.get("salesVolume").toString()); -// } -// -// dailyInfoServiceImpl.saveOne(daily); -// } - } - XxlJobHelper.log("start get data:"+codeParam+" end"); + private void getInfo(String codeParam,String formattedDate,String token){ + + + XxlJobHelper.log("start get data:"+codeParam+",time:"+formattedDate); + Map dataMap = LngCollector.getTankGunInfo(codeParam,formattedDate,token); + + XxlJobHelper.log("get data info :"+JSONObject.toJSONString(dataMap)); + + + String businessDay = dataMap.get("businessDay").toString(); + + // 期初库存、本期进货、加气机发出量、数量、单价、本期损溢处理、期末库存 + JSONArray gasSoldList =(JSONArray) dataMap.get("gasSoldList"); + + + + if(gasSoldList.size()>0){ + JSONObject sold = gasSoldList.getJSONObject(0); + SaleDaily daily = new SaleDaily(); + daily.setBusinessDay(businessDay); + daily.setStationCode(codeParam); + daily.setStartVolume(sold.get("startVolume").toString()); + daily.setFuelShortName(sold.get("fuelName").toString()); + daily.setFuelNo(sold.get("fuelNo").toString()); + daily.setEndVolume(sold.get("endVolume").toString()); + daily.setPurchaseVolume(sold.get("purchaseVolume").toString()); + daily.setIssuedVolume(sold.get("issuedVolume").toString()); + daily.setProfitLossVolume(sold.get("profitLossVolume").toString()); + + JSONArray gasSoldItemList = sold.getJSONArray("gasSoldItemList"); + if(gasSoldItemList.size()>0){ + JSONObject gasSold = gasSoldItemList.getJSONObject(0); + daily.setUnitPrice(gasSold.get("unitPrice").toString()); + daily.setSalesVolume(gasSold.get("salesVolume").toString()); } + + dailyServiceImpl.saveOne(daily); } + //不同支付类型销售数据统计 + JSONArray gasPaymentList =(JSONArray) dataMap.get("gasPaymentList"); + if(gasSoldList.size()>0){ + for(Object info : gasPaymentList) { + JSONObject sold = (JSONObject) JSONObject.toJSON(info); + + + JSONArray gasPaymentItemList =(JSONArray) sold.get("gasPaymentItemList"); + + XxlJobHelper.log("gasPaymentList get data info :"+JSONObject.toJSONString(sold)); + + for(Object item : gasPaymentItemList) { + + JSONObject gasItem = (JSONObject) JSONObject.toJSON(item); + XxlJobHelper.log("gasPaymentItemList get data info :"+JSONObject.toJSONString(gasItem)); + + SaleDailyInfo daily = new SaleDailyInfo(); + daily.setBusinessDay(businessDay); + daily.setStationCode(codeParam); + daily.setUnitPrice(gasItem.get("unitPrice").toString()); + daily.setSalesVolume(gasItem.get("salesVolume").toString()); + daily.setFuelNo(gasItem.get("fuelNo").toString()); + daily.setFuelShortName(gasItem.get("fuelName").toString()); + daily.setPaymentMethod(PaymentMethod.getName(sold.get("paymentMethod").toString())); + daily.setReceivedAmount(gasItem.get("receivedAmount").toString()); + daily.setTotalPaymentAmount(sold.get("totalPaymentAmount").toString()); + dailyInfoServiceImpl.saveOne(daily); + } + } + } + XxlJobHelper.log("start get data:"+codeParam+" end"); } diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml index df93c522..5afda271 100644 --- a/xxl-job-core/pom.xml +++ b/xxl-job-core/pom.xml @@ -7,7 +7,7 @@ 2.4.2-SNAPSHOT xxl-job-core - pom + jar ${project.artifactId} A distributed task scheduling framework. @@ -72,4 +72,4 @@ https://public.hidotcom.cn/libabc/repository/fdrepo/ - \ No newline at end of file + From 13f20a24f06d3de71ce62bb5a48cd5972cbff7f3 Mon Sep 17 00:00:00 2001 From: xieke Date: Sat, 28 Sep 2024 17:19:12 +0800 Subject: [PATCH 11/12] xxl job for qpsy collector --- .../pom.xml | 12 ++ .../qp/stationv2/config/Argument.java | 90 +++++++++++++++ .../jobhandler/QpStationV2CollectorJob.java | 105 +++++++++++++++++- 3 files changed, 202 insertions(+), 5 deletions(-) create mode 100644 wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml index 19974e94..a74a3435 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml @@ -50,6 +50,18 @@ + + com.beust + jcommander + 1.78 + + + + com.wanhua.sitedata.sync + sitedata_sync_tools + 1.0-QP-SNAPSHOT + + diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java new file mode 100644 index 00000000..c597db78 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java @@ -0,0 +1,90 @@ +package com.wanhua.frameless.qp.stationv2.config; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import lombok.Data; + +@Data +public class Argument { + @Parameter( + names = "--stationId", + description = "station Id to sync, using station name", + required = false + ) + private String stationId; + + @Parameter( + names = "--city", + description = "city name to sync", + required = false + ) + private String city; + + @Parameter( + names = "--om", + description = "om name to sync", + required = false + ) + private String om; + + @Parameter( + names = "--tm", + description = "tm name to sync", + required = false + ) + private String tm; + + @Parameter( + names = "--startTime", + description = "startTime to sync, yyyy-MM-dd", + required = false + ) + private String startTime; + + + @Parameter( + names = "--endTime", + description = "endTime to sync, yyyy-MM-dd", + required = false + ) + private String endTime; + + @Parameter( + names = "--clearDataFirst", + description = "clear data first", + required = false + ) + private boolean clearDataFirst; + + + public static Argument parseAgument(String args) throws ParameterException { + Argument jArgs = new Argument(); + JCommander cmd = JCommander.newBuilder() + .addObject(jArgs) + .build(); + cmd.parse(args); + + if (jArgs.stationId == null && jArgs.city == null && jArgs.om == null && jArgs.tm == null) { + throw new ParameterException("stationId or city or om or tm must be set"); + } + + if (jArgs.stationId != null && (jArgs.city != null || jArgs.om != null || jArgs.tm != null)) { + throw new ParameterException("stationId and city or om or tm can not be set together"); + } + + if (jArgs.city != null && (jArgs.om != null || jArgs.tm != null)) { + throw new ParameterException("city and om or tm can not be set together"); + } + + if (jArgs.om != null && jArgs.tm != null) { + throw new ParameterException("om and tm can not be set together"); + } + + if (jArgs.startTime != null && jArgs.endTime == null || jArgs.endTime != null && jArgs.startTime == null) { + throw new ParameterException("startTime or endTime must be set together"); + } + return jArgs; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java index bbd1942d..330ef123 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java @@ -1,5 +1,18 @@ package com.wanhua.frameless.qp.stationv2.jobhandler; +import com.beust.jcommander.ParameterException; +import com.wanhua.bigdata.kafka.service.IStationShiftControlService; +import com.wanhua.bigdata.kafka.service.impl.StationShiftControlServiceImpl; +import com.wanhua.frameless.qp.stationv2.config.Argument; +import com.wanhua.pump_tank_error_detect_base_dao_service.service.*; +import com.wanhua.pump_tank_error_detect_base_dao_service.service.impl.*; +import com.wanhua.sitedata.sync.CommonShiftDataJob; +import com.wanhua.sygl.service.service.IQpsyDeptService; +import com.wanhua.sygl.service.service.ISyglOriginalDetailService; +import com.wanhua.sygl.service.service.impl.QpsyDeptServiceImpl; +import com.wanhua.sygl.service.service.impl.QpsyOracleServiceImpl; +import com.wanhua.utils.mybaits.CommonMybatisHelper; +import com.wanhua.utils.mybaits.GetCommonMybatisUtils; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; @@ -11,8 +24,11 @@ import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; +import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * XxlJob开发示例(Bean模式) @@ -29,16 +45,95 @@ public class QpStationV2CollectorJob { private static Logger logger = LoggerFactory.getLogger(QpStationV2CollectorJob.class); + Consumer getPrintlog() { + return new Consumer() { + @Override + public void accept(String s) { + XxlJobHelper.log(s); + } + }; + } + + Consumer getPrintthrowable() { + return new Consumer() { + @Override + public void accept(Throwable throwable) { + XxlJobHelper.log(throwable); + } + }; + } /** * 1、简单任务示例(Bean模式) */ - @XxlJob("demoJobHandler") + @XxlJob("qp_collector") public void demoJobHandler() throws Exception { - XxlJobHelper.log("XXL-JOB, Hello World."); + XxlJobHelper.log("QP Collector begin."); + + String param = XxlJobHelper.getJobParam(); + Argument argument = null; + try { + argument = Argument.parseAgument(param); + } catch (ParameterException e) { + XxlJobHelper.log("parse argument failed "); + XxlJobHelper.log(e); + XxlJobHelper.handleFail(); + return; + } + + CommonMybatisHelper instance = GetCommonMybatisUtils.getMybatisUtilsByEnv("sygl"); + CommonMybatisHelper oracleTradeInstance = GetCommonMybatisUtils.getMybatisUtilsByEnv("qporacle"); + + IStationService stationService = new StationServiceImpl(instance); + IDetectSetsService detectSetsService = new DetectSetsServiceImpl(instance); + IShiftTankDeliveryService shiftTankDeliveryService = new ShiftTankDeliveryServiceImpl(instance); + IPumpRelationService pumpRelationService = new PumpRelationService(instance); + IPumpService pumpService = new PumpService(instance); + IWeatherTemperatureDataService weatherTemperatureDataService = new WeatherTemperatureDataServiceImpl(instance); + IStationShiftControlService shiftControlService = new StationShiftControlServiceImpl(instance); + IQpsyDeptService qpsyDeptService = new QpsyDeptServiceImpl(instance); + ISyglOriginalDetailService syglOriginalDetailService = new QpsyOracleServiceImpl(oracleTradeInstance); + ILiquidLevelService liquidLevelService = new LiquidLevelService(instance); + IStationDataCaptureStatusService stationDataCaptureStatusService = new StationDataCaptureStatusServiceImpl(instance); + + if (argument.getStationId() == null) { + XxlJobHelper.log("stationId is null, exit"); + return; + } + + String priorityStations = argument.getStationId(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + Date startFetchDate = sdf.parse(sdf.format(new Date())); + Date endFetchDate = startFetchDate; + if (argument.getStartTime() != null && argument.getEndTime() != null) { + startFetchDate = sdf.parse(argument.getStartTime()); + endFetchDate = sdf.parse(argument.getEndTime()); + } - for (int i = 0; i < 5; i++) { - XxlJobHelper.log("beat at:" + i); - TimeUnit.SECONDS.sleep(2); + CommonShiftDataJob job = new CommonShiftDataJob( + priorityStations, + stationService, + detectSetsService, + shiftTankDeliveryService, + pumpRelationService, + null, + pumpService, + stationDataCaptureStatusService, + qpsyDeptService, + syglOriginalDetailService, + liquidLevelService, + shiftControlService, + startFetchDate, + endFetchDate, + getPrintlog(), + getPrintthrowable()); + + try { + job.run(); + } catch (Exception e) { + XxlJobHelper.log("collector failed"); + XxlJobHelper.log(e); + XxlJobHelper.handleFail(); + return; } // default success } From 150a38a854fa660a903617bb86f6b0b58526b5a0 Mon Sep 17 00:00:00 2001 From: xieke Date: Mon, 7 Oct 2024 09:45:22 +0800 Subject: [PATCH 12/12] =?UTF-8?q?=E8=AF=BB=E5=8F=96config=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6xxl-job-executor.p?= =?UTF-8?q?roperties?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qp/stationv2/config/QpCollectorJobConfig.java | 12 ++++++++---- .../jobhandler/QpStationV2CollectorJob.java | 1 - .../src/main/resources/xxl-job-executor.properties | 4 ++-- .../config/StationV3CollectorJobConfig.java | 10 ++++++++-- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java index fd3a8ba8..4040689a 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java @@ -6,8 +6,7 @@ import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; import java.util.Arrays; import java.util.Properties; @@ -70,8 +69,13 @@ public class QpCollectorJobConfig { InputStreamReader in = null; try { ClassLoader loder = Thread.currentThread().getContextClassLoader(); - - in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + String configFileName = "config/"+propertyFileName; + try { + in = new FileReader(configFileName); + logger.info("init from out config succeed for {}", configFileName); + } catch (Throwable e) { + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + } if (in != null) { Properties prop = new Properties(); prop.load(in); diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java index 330ef123..883c4747 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java @@ -27,7 +27,6 @@ import java.net.URL; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties index d5670185..534e4ff4 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -1,5 +1,5 @@ ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://172.22.62.105/internal/etl/xxl-job-admin +xxl.job.admin.addresses=https://172.22.62.105/internal/etl/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token @@ -7,7 +7,7 @@ xxl.job.accessToken=default_token ### xxl-job executor appname xxl.job.executor.appname=qpsy-interbase2 ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null -xxl.job.executor.address=http://172.22.62.105 +xxl.job.executor.address=https://172.22.62.105 ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=19998 diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java index ec693006..b9c51614 100644 --- a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java @@ -7,6 +7,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.wanhua.frameless.stationv3.jobhandler.StationV3CollectorJob; +import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; @@ -71,8 +72,13 @@ public class StationV3CollectorJobConfig { InputStreamReader in = null; try { ClassLoader loder = Thread.currentThread().getContextClassLoader(); - - in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + String configFileName = "config/"+propertyFileName; + try { + in = new FileReader(configFileName); + logger.info("init from out config succeed for {}", configFileName); + } catch (Throwable e) { + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + } if (in != null) { Properties prop = new Properties(); prop.load(in);