diff --git a/pom.xml b/pom.xml index ad5a2b4e..9d0058ae 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,8 @@ xxl-job-core xxl-job-admin xxl-job-executor-samples - + wanhua-executor-plugins + UTF-8 diff --git a/wanhua-executor-plugins/pom.xml b/wanhua-executor-plugins/pom.xml new file mode 100644 index 00000000..ccce2347 --- /dev/null +++ b/wanhua-executor-plugins/pom.xml @@ -0,0 +1,24 @@ + + + + xxl-job + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-executor-plugins + pom + + wanhua-frameless-stationv3-collector-plugin + wanhua-frameless-qp-stationv2-collector-plugin + + + + 8 + 8 + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml new file mode 100644 index 00000000..a74a3435 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml @@ -0,0 +1,112 @@ + + + + wanhua-executor-plugins + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-frameless-qp-stationv2-collector-plugin + + + 8 + 8 + + + + + org.slf4j + slf4j-reload4j + ${slf4j-api.version} + + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + test + + + + + com.xuxueli + xxl-job-core + ${project.parent.version} + + + + + com.wanhua + stationv3-base-dao-service + 1.0-QP-SNAPSHOT + + + org.slf4j + slf4j-api + + + + + + com.beust + jcommander + 1.78 + + + + com.wanhua.sitedata.sync + sitedata_sync_tools + 1.0-QP-SNAPSHOT + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + false + + + true + + lib/ + + com.wanhua.frameless.stationv3.FramelessStationV3CollectorApplication + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + src/main/assembly/assembly.xml + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml new file mode 100644 index 00000000..9fd11c53 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,45 @@ + + release + + zip + + false + + + + + + + + + + + + + + + + + + false + lib + false + + + + + + ${project.basedir}/src/main/resources + config + + + ${project.build.directory} + + + *.jar + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java new file mode 100644 index 00000000..066765fe --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/FramelessQpStationV2CollectorApplication.java @@ -0,0 +1,38 @@ +package com.wanhua.frameless.qp.stationv2; + +import com.wanhua.frameless.qp.stationv2.config.QpCollectorJobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class FramelessQpStationV2CollectorApplication { + private static Logger logger = LoggerFactory.getLogger(FramelessQpStationV2CollectorApplication.class); + + public static void main(String[] args) { + + try { + // start + QpCollectorJobConfig.getInstance().initXxlJobExecutor(); + + // Blocks until interrupted + while (true) { + try { + TimeUnit.HOURS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + // destroy + QpCollectorJobConfig.getInstance().destroyXxlJobExecutor(); + } + + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java new file mode 100644 index 00000000..c597db78 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java @@ -0,0 +1,90 @@ +package com.wanhua.frameless.qp.stationv2.config; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import lombok.Data; + +@Data +public class Argument { + @Parameter( + names = "--stationId", + description = "station Id to sync, using station name", + required = false + ) + private String stationId; + + @Parameter( + names = "--city", + description = "city name to sync", + required = false + ) + private String city; + + @Parameter( + names = "--om", + description = "om name to sync", + required = false + ) + private String om; + + @Parameter( + names = "--tm", + description = "tm name to sync", + required = false + ) + private String tm; + + @Parameter( + names = "--startTime", + description = "startTime to sync, yyyy-MM-dd", + required = false + ) + private String startTime; + + + @Parameter( + names = "--endTime", + description = "endTime to sync, yyyy-MM-dd", + required = false + ) + private String endTime; + + @Parameter( + names = "--clearDataFirst", + description = "clear data first", + required = false + ) + private boolean clearDataFirst; + + + public static Argument parseAgument(String args) throws ParameterException { + Argument jArgs = new Argument(); + JCommander cmd = JCommander.newBuilder() + .addObject(jArgs) + .build(); + cmd.parse(args); + + if (jArgs.stationId == null && jArgs.city == null && jArgs.om == null && jArgs.tm == null) { + throw new ParameterException("stationId or city or om or tm must be set"); + } + + if (jArgs.stationId != null && (jArgs.city != null || jArgs.om != null || jArgs.tm != null)) { + throw new ParameterException("stationId and city or om or tm can not be set together"); + } + + if (jArgs.city != null && (jArgs.om != null || jArgs.tm != null)) { + throw new ParameterException("city and om or tm can not be set together"); + } + + if (jArgs.om != null && jArgs.tm != null) { + throw new ParameterException("om and tm can not be set together"); + } + + if (jArgs.startTime != null && jArgs.endTime == null || jArgs.endTime != null && jArgs.startTime == null) { + throw new ParameterException("startTime or endTime must be set together"); + } + return jArgs; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java new file mode 100644 index 00000000..4040689a --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/QpCollectorJobConfig.java @@ -0,0 +1,98 @@ +package com.wanhua.frameless.qp.stationv2.config; + + +import com.wanhua.frameless.qp.stationv2.jobhandler.QpStationV2CollectorJob; +import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Arrays; +import java.util.Properties; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class QpCollectorJobConfig { + private static Logger logger = LoggerFactory.getLogger(QpCollectorJobConfig.class); + + + private static QpCollectorJobConfig instance = new QpCollectorJobConfig(); + public static QpCollectorJobConfig getInstance() { + return instance; + } + + + private XxlJobSimpleExecutor xxlJobExecutor = null; + + /** + * init + */ + public void initXxlJobExecutor() { + + // load executor prop + Properties xxlJobProp = loadProperties("xxl-job-executor.properties"); + + // init executor + xxlJobExecutor = new XxlJobSimpleExecutor(); + xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses")); + xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken")); + xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname")); + xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address")); + xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip")); + xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port"))); + xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath")); + xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays"))); + + // registry job bean + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new QpStationV2CollectorJob())); + + // start executor + try { + xxlJobExecutor.start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + /** + * destroy + */ + public void destroyXxlJobExecutor() { + if (xxlJobExecutor != null) { + xxlJobExecutor.destroy(); + } + } + + + public static Properties loadProperties(String propertyFileName) { + InputStreamReader in = null; + try { + ClassLoader loder = Thread.currentThread().getContextClassLoader(); + String configFileName = "config/"+propertyFileName; + try { + in = new FileReader(configFileName); + logger.info("init from out config succeed for {}", configFileName); + } catch (Throwable e) { + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + } + if (in != null) { + Properties prop = new Properties(); + prop.load(in); + return prop; + } + } catch (IOException e) { + logger.error("load {} error!", propertyFileName); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error("close {} error!", propertyFileName); + } + } + } + return null; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java new file mode 100644 index 00000000..883c4747 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java @@ -0,0 +1,345 @@ +package com.wanhua.frameless.qp.stationv2.jobhandler; + +import com.beust.jcommander.ParameterException; +import com.wanhua.bigdata.kafka.service.IStationShiftControlService; +import com.wanhua.bigdata.kafka.service.impl.StationShiftControlServiceImpl; +import com.wanhua.frameless.qp.stationv2.config.Argument; +import com.wanhua.pump_tank_error_detect_base_dao_service.service.*; +import com.wanhua.pump_tank_error_detect_base_dao_service.service.impl.*; +import com.wanhua.sitedata.sync.CommonShiftDataJob; +import com.wanhua.sygl.service.service.IQpsyDeptService; +import com.wanhua.sygl.service.service.ISyglOriginalDetailService; +import com.wanhua.sygl.service.service.impl.QpsyDeptServiceImpl; +import com.wanhua.sygl.service.service.impl.QpsyOracleServiceImpl; +import com.wanhua.utils.mybaits.CommonMybatisHelper; +import com.wanhua.utils.mybaits.GetCommonMybatisUtils; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.function.Consumer; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class QpStationV2CollectorJob { + private static Logger logger = LoggerFactory.getLogger(QpStationV2CollectorJob.class); + + + Consumer getPrintlog() { + return new Consumer() { + @Override + public void accept(String s) { + XxlJobHelper.log(s); + } + }; + } + + Consumer getPrintthrowable() { + return new Consumer() { + @Override + public void accept(Throwable throwable) { + XxlJobHelper.log(throwable); + } + }; + } + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("qp_collector") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("QP Collector begin."); + + String param = XxlJobHelper.getJobParam(); + Argument argument = null; + try { + argument = Argument.parseAgument(param); + } catch (ParameterException e) { + XxlJobHelper.log("parse argument failed "); + XxlJobHelper.log(e); + XxlJobHelper.handleFail(); + return; + } + + CommonMybatisHelper instance = GetCommonMybatisUtils.getMybatisUtilsByEnv("sygl"); + CommonMybatisHelper oracleTradeInstance = GetCommonMybatisUtils.getMybatisUtilsByEnv("qporacle"); + + IStationService stationService = new StationServiceImpl(instance); + IDetectSetsService detectSetsService = new DetectSetsServiceImpl(instance); + IShiftTankDeliveryService shiftTankDeliveryService = new ShiftTankDeliveryServiceImpl(instance); + IPumpRelationService pumpRelationService = new PumpRelationService(instance); + IPumpService pumpService = new PumpService(instance); + IWeatherTemperatureDataService weatherTemperatureDataService = new WeatherTemperatureDataServiceImpl(instance); + IStationShiftControlService shiftControlService = new StationShiftControlServiceImpl(instance); + IQpsyDeptService qpsyDeptService = new QpsyDeptServiceImpl(instance); + ISyglOriginalDetailService syglOriginalDetailService = new QpsyOracleServiceImpl(oracleTradeInstance); + ILiquidLevelService liquidLevelService = new LiquidLevelService(instance); + IStationDataCaptureStatusService stationDataCaptureStatusService = new StationDataCaptureStatusServiceImpl(instance); + + if (argument.getStationId() == null) { + XxlJobHelper.log("stationId is null, exit"); + return; + } + + String priorityStations = argument.getStationId(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + Date startFetchDate = sdf.parse(sdf.format(new Date())); + Date endFetchDate = startFetchDate; + if (argument.getStartTime() != null && argument.getEndTime() != null) { + startFetchDate = sdf.parse(argument.getStartTime()); + endFetchDate = sdf.parse(argument.getEndTime()); + } + + CommonShiftDataJob job = new CommonShiftDataJob( + priorityStations, + stationService, + detectSetsService, + shiftTankDeliveryService, + pumpRelationService, + null, + pumpService, + stationDataCaptureStatusService, + qpsyDeptService, + syglOriginalDetailService, + liquidLevelService, + shiftControlService, + startFetchDate, + endFetchDate, + getPrintlog(), + getPrintthrowable()); + + try { + job.run(); + } catch (Exception e) { + XxlJobHelper.log("collector failed"); + XxlJobHelper.log(e); + XxlJobHelper.handleFail(); + return; + } + // default success + } + + + /** + * 2、分片广播任务 + */ + @XxlJob("shardingJobHandler") + public void shardingJobHandler() throws Exception { + + // 分片参数 + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + // 业务逻辑 + for (int i = 0; i < shardTotal; i++) { + if (i == shardIndex) { + XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); + } else { + XxlJobHelper.log("第 {} 片, 忽略", i); + } + } + + } + + + /** + * 3、命令行任务 + */ + @XxlJob("commandJobHandler") + public void commandJobHandler() throws Exception { + String command = XxlJobHelper.getJobParam(); + int exitValue = -1; + + BufferedReader bufferedReader = null; + try { + // command process + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command(command); + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + //Process process = Runtime.getRuntime().exec(command); + + BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); + bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); + + // command log + String line; + while ((line = bufferedReader.readLine()) != null) { + XxlJobHelper.log(line); + } + + // command exit + process.waitFor(); + exitValue = process.exitValue(); + } catch (Exception e) { + XxlJobHelper.log(e); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + if (exitValue == 0) { + // default success + } else { + XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); + } + + } + + + /** + * 4、跨平台Http任务 + * 参数示例: + * "url: http://www.baidu.com\n" + + * "method: get\n" + + * "data: content\n"; + */ + @XxlJob("httpJobHandler") + public void httpJobHandler() throws Exception { + + // param parse + String param = XxlJobHelper.getJobParam(); + if (param==null || param.trim().length()==0) { + XxlJobHelper.log("param["+ param +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + + String[] httpParams = param.split("\n"); + String url = null; + String method = null; + String data = null; + for (String httpParam: httpParams) { + if (httpParam.startsWith("url:")) { + url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); + } + if (httpParam.startsWith("method:")) { + method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); + } + if (httpParam.startsWith("data:")) { + data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); + } + } + + // param valid + if (url==null || url.trim().length()==0) { + XxlJobHelper.log("url["+ url +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + if (method==null || !Arrays.asList("GET", "POST").contains(method)) { + XxlJobHelper.log("method["+ method +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + boolean isPostMethod = method.equals("POST"); + + // request + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // connection setting + connection.setRequestMethod(method); + connection.setDoOutput(isPostMethod); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(5 * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + // do connection + connection.connect(); + + // data + if (isPostMethod && data!=null && data.trim().length()>0) { + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(data.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + } + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String responseMsg = result.toString(); + + XxlJobHelper.log(responseMsg); + + return; + } catch (Exception e) { + XxlJobHelper.log(e); + + XxlJobHelper.handleFail(); + return; + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + XxlJobHelper.log(e2); + } + } + + } + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml new file mode 100644 index 00000000..da3a9721 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/log4j.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties new file mode 100644 index 00000000..534e4ff4 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -0,0 +1,17 @@ +### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" +xxl.job.admin.addresses=https://172.22.62.105/internal/etl/xxl-job-admin + +### xxl-job, access token +xxl.job.accessToken=default_token + +### xxl-job executor appname +xxl.job.executor.appname=qpsy-interbase2 +### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null +xxl.job.executor.address=https://172.22.62.105 +### xxl-job executor server-info +xxl.job.executor.ip= +xxl.job.executor.port=19998 +### xxl-job executor log-path +xxl.job.executor.logpath=logs/xxl-job/jobhandler +### xxl-job executor log-retention-days +xxl.job.executor.logretentiondays=30 \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml new file mode 100644 index 00000000..14a96fb6 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/pom.xml @@ -0,0 +1,117 @@ + + + + wanhua-executor-plugins + com.xuxueli + 2.4.2-SNAPSHOT + + 4.0.0 + + wanhua-frameless-stationv3-collector-plugin + + + 8 + 8 + + + + + + org.slf4j + slf4j-reload4j + ${slf4j-api.version} + + + org.slf4j + slf4j-api + ${slf4j-api.version} + + + + org.junit.jupiter + junit-jupiter-engine + ${junit-jupiter.version} + test + + + + + com.xuxueli + xxl-job-core + ${project.parent.version} + + + + com.wanhua + stationv3-http-data-collector + 1.0-SNAPSHOT + + + org.slf4j + slf4j-api + + + + + + com.wanhua + stationv3-base-dao-service + 1.0-SNAPSHOT + + + org.slf4j + slf4j-api + + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.4.2 + + + + false + + + true + + lib/ + + com.wanhua.frameless.stationv3.FramelessStationV3CollectorApplication + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + src/main/assembly/assembly.xml + + + + + make-assembly + package + + single + + + + + + + diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml new file mode 100644 index 00000000..9fd11c53 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,45 @@ + + release + + zip + + false + + + + + + + + + + + + + + + + + + false + lib + false + + + + + + ${project.basedir}/src/main/resources + config + + + ${project.build.directory} + + + *.jar + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java new file mode 100644 index 00000000..834fdfdc --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/FramelessStationV3CollectorApplication.java @@ -0,0 +1,38 @@ +package com.wanhua.frameless.stationv3; + +import com.wanhua.frameless.stationv3.config.StationV3CollectorJobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class FramelessStationV3CollectorApplication { + private static Logger logger = LoggerFactory.getLogger(FramelessStationV3CollectorApplication.class); + + public static void main(String[] args) { + + try { + // start + StationV3CollectorJobConfig.getInstance().initXxlJobExecutor(); + + // Blocks until interrupted + while (true) { + try { + TimeUnit.HOURS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + // destroy + StationV3CollectorJobConfig.getInstance().destroyXxlJobExecutor(); + } + + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java new file mode 100644 index 00000000..b9c51614 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/config/StationV3CollectorJobConfig.java @@ -0,0 +1,101 @@ +package com.wanhua.frameless.stationv3.config; + + +import com.wanhua.frameless.stationv3.jobhandler.LngCollectorJob; +import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.wanhua.frameless.stationv3.jobhandler.StationV3CollectorJob; + +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Properties; + +/** + * @author xuxueli 2018-10-31 19:05:43 + */ +public class StationV3CollectorJobConfig { + private static Logger logger = LoggerFactory.getLogger(StationV3CollectorJobConfig.class); + + + private static StationV3CollectorJobConfig instance = new StationV3CollectorJobConfig(); + public static StationV3CollectorJobConfig getInstance() { + return instance; + } + + + private XxlJobSimpleExecutor xxlJobExecutor = null; + + /** + * init + */ + public void initXxlJobExecutor() { + + // load executor prop + Properties xxlJobProp = loadProperties("xxl-job-executor.properties"); + + // init executor + xxlJobExecutor = new XxlJobSimpleExecutor(); + xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses")); + xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken")); + xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname")); + xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address")); + xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip")); + xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port"))); + xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath")); + xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays"))); + + // registry job bean + xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new StationV3CollectorJob(), new LngCollectorJob())); + + // start executor + try { + xxlJobExecutor.start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + /** + * destroy + */ + public void destroyXxlJobExecutor() { + if (xxlJobExecutor != null) { + xxlJobExecutor.destroy(); + } + } + + + public static Properties loadProperties(String propertyFileName) { + InputStreamReader in = null; + try { + ClassLoader loder = Thread.currentThread().getContextClassLoader(); + String configFileName = "config/"+propertyFileName; + try { + in = new FileReader(configFileName); + logger.info("init from out config succeed for {}", configFileName); + } catch (Throwable e) { + in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");; + } + if (in != null) { + Properties prop = new Properties(); + prop.load(in); + return prop; + } + } catch (IOException e) { + logger.error("load {} error!", propertyFileName); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + logger.error("close {} error!", propertyFileName); + } + } + } + return null; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java new file mode 100644 index 00000000..0b6e1bb3 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/LngCollectorJob.java @@ -0,0 +1,206 @@ +package com.wanhua.frameless.stationv3.jobhandler; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.wanhua.lng.daySale.model.SaleDaily; +import com.wanhua.lng.daySale.model.SaleDailyInfo; +import com.wanhua.lng.daySale.service.impl.SaleDailyInfoServiceImpl; +import com.wanhua.lng.daySale.service.impl.SaleDailyServiceImpl; +import com.wanhua.lng.daySale.utils.PaymentMethod; +import com.wanhua.stationservice.baseDaoService.service.impl.PumpServiceImpl; +import com.wanhua.stationservice.baseutils.security.CommonBasedAESUtil; +import com.wanhua.stationv3.data.collector.CollectorTest; +import com.wanhua.stationv3.data.collector.LngCollector; +import com.wanhua.stationv3.data.collector.TokenUtil; +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import io.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class LngCollectorJob { + private static Logger logger = LoggerFactory.getLogger(LngCollectorJob.class); + + SaleDailyServiceImpl dailyServiceImpl = new SaleDailyServiceImpl(); + SaleDailyInfoServiceImpl dailyInfoServiceImpl = new SaleDailyInfoServiceImpl(); + + + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("getSaleDaily") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("XXL-JOB,begin start "); + + String param = XxlJobHelper.getJobParam(); + Map params = JSONObject.parseObject(param,Map.class); + //{"time":"2024-01-01/2024-01-03","code":"1-A6201-C001-S085,1-A6201-C001-S084"} + LocalDate yesterday = LocalDate.now().minusDays(1); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + String formattedDate = yesterday.format(formatter); + + String code = params.get("code"); + + String fullToken = TokenUtil.getTokenFromServer("xieke"); + XxlJobHelper.log("token: "+ fullToken); + String secondsFormat = fullToken.substring(0,15); + Long sinceSeconds = Long.parseLong(secondsFormat); + XxlJobHelper.log("since seconds: "+ sinceSeconds); + String tokenFormat = fullToken.substring(15); + String newToken = CommonBasedAESUtil.decrypt("12345678900abcde", "78945612300abcde", tokenFormat); + newToken = newToken.substring(0, newToken.indexOf("SALT1234567890")); + XxlJobHelper.log("realtoken "+ newToken); + String token = newToken; + + + if(!StringUtil.isNullOrEmpty(params.get("time"))){ + String times = params.get("time"); + String start = times.split("/")[0]; + String end = times.split("/")[1]; + + List timeList = getDatesBetween(start,end); + for(String time : timeList){ + String[] codes = code.split(","); + for (String codeParam : codes){ + getInfo(codeParam,time,token); + } + } + + }else{ + String[] codes = code.split(","); + for (String codeParam : codes){ + getInfo(codeParam,formattedDate,token); + } + } + } + + + + private static List getDatesBetween(String startDateStr, String endDateStr) { + // 定义日期格式 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + // 将字符串转换为 LocalDate + LocalDate startDate = LocalDate.parse(startDateStr, formatter); + LocalDate endDate = LocalDate.parse(endDateStr, formatter); + + List datesInRange = new ArrayList<>(); + + // 使用 while 循环获取范围内的所有日期 + while (!startDate.isAfter(endDate)) { + // 将每个日期格式化为字符串并添加到列表中 + datesInRange.add(startDate.format(formatter)); + // 日期递增一天 + startDate = startDate.plusDays(1); + } + return datesInRange; + } + + private void getInfo(String codeParam,String formattedDate,String token){ + + + XxlJobHelper.log("start get data:"+codeParam+",time:"+formattedDate); + Map dataMap = LngCollector.getTankGunInfo(codeParam,formattedDate,token); + + XxlJobHelper.log("get data info :"+JSONObject.toJSONString(dataMap)); + + + String businessDay = dataMap.get("businessDay").toString(); + + // 期初库存、本期进货、加气机发出量、数量、单价、本期损溢处理、期末库存 + JSONArray gasSoldList =(JSONArray) dataMap.get("gasSoldList"); + + + + if(gasSoldList.size()>0){ + JSONObject sold = gasSoldList.getJSONObject(0); + SaleDaily daily = new SaleDaily(); + daily.setBusinessDay(businessDay); + daily.setStationCode(codeParam); + daily.setStartVolume(sold.get("startVolume").toString()); + daily.setFuelShortName(sold.get("fuelName").toString()); + daily.setFuelNo(sold.get("fuelNo").toString()); + daily.setEndVolume(sold.get("endVolume").toString()); + daily.setPurchaseVolume(sold.get("purchaseVolume").toString()); + daily.setIssuedVolume(sold.get("issuedVolume").toString()); + daily.setProfitLossVolume(sold.get("profitLossVolume").toString()); + + JSONArray gasSoldItemList = sold.getJSONArray("gasSoldItemList"); + if(gasSoldItemList.size()>0){ + JSONObject gasSold = gasSoldItemList.getJSONObject(0); + daily.setUnitPrice(gasSold.get("unitPrice").toString()); + daily.setSalesVolume(gasSold.get("salesVolume").toString()); + } + + dailyServiceImpl.saveOne(daily); + } + + + //不同支付类型销售数据统计 + JSONArray gasPaymentList =(JSONArray) dataMap.get("gasPaymentList"); + if(gasSoldList.size()>0){ + for(Object info : gasPaymentList) { + JSONObject sold = (JSONObject) JSONObject.toJSON(info); + + + JSONArray gasPaymentItemList =(JSONArray) sold.get("gasPaymentItemList"); + + XxlJobHelper.log("gasPaymentList get data info :"+JSONObject.toJSONString(sold)); + + for(Object item : gasPaymentItemList) { + + JSONObject gasItem = (JSONObject) JSONObject.toJSON(item); + XxlJobHelper.log("gasPaymentItemList get data info :"+JSONObject.toJSONString(gasItem)); + + SaleDailyInfo daily = new SaleDailyInfo(); + daily.setBusinessDay(businessDay); + daily.setStationCode(codeParam); + daily.setUnitPrice(gasItem.get("unitPrice").toString()); + daily.setSalesVolume(gasItem.get("salesVolume").toString()); + daily.setFuelNo(gasItem.get("fuelNo").toString()); + daily.setFuelShortName(gasItem.get("fuelName").toString()); + daily.setPaymentMethod(PaymentMethod.getName(sold.get("paymentMethod").toString())); + daily.setReceivedAmount(gasItem.get("receivedAmount").toString()); + daily.setTotalPaymentAmount(sold.get("totalPaymentAmount").toString()); + dailyInfoServiceImpl.saveOne(daily); + } + } + } + XxlJobHelper.log("start get data:"+codeParam+" end"); + } + + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java new file mode 100644 index 00000000..fc66ffa7 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/java/com/wanhua/frameless/stationv3/jobhandler/StationV3CollectorJob.java @@ -0,0 +1,251 @@ +package com.wanhua.frameless.stationv3.jobhandler; + +import com.xxl.job.core.context.XxlJobHelper; +import com.xxl.job.core.handler.annotation.XxlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** + * XxlJob开发示例(Bean模式) + * + * 开发步骤: + * 1、任务开发:在Spring Bean实例中,开发Job方法; + * 2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。 + * 3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志; + * 4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果; + * + * @author xuxueli 2019-12-11 21:52:51 + */ +public class StationV3CollectorJob { + private static Logger logger = LoggerFactory.getLogger(StationV3CollectorJob.class); + + + /** + * 1、简单任务示例(Bean模式) + */ + @XxlJob("demoJobHandler") + public void demoJobHandler() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + + for (int i = 0; i < 5; i++) { + XxlJobHelper.log("beat at:" + i); + TimeUnit.SECONDS.sleep(2); + } + // default success + } + + + /** + * 2、分片广播任务 + */ + @XxlJob("shardingJobHandler") + public void shardingJobHandler() throws Exception { + + // 分片参数 + int shardIndex = XxlJobHelper.getShardIndex(); + int shardTotal = XxlJobHelper.getShardTotal(); + + XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal); + + // 业务逻辑 + for (int i = 0; i < shardTotal; i++) { + if (i == shardIndex) { + XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); + } else { + XxlJobHelper.log("第 {} 片, 忽略", i); + } + } + + } + + + /** + * 3、命令行任务 + */ + @XxlJob("commandJobHandler") + public void commandJobHandler() throws Exception { + String command = XxlJobHelper.getJobParam(); + int exitValue = -1; + + BufferedReader bufferedReader = null; + try { + // command process + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command(command); + processBuilder.redirectErrorStream(true); + + Process process = processBuilder.start(); + //Process process = Runtime.getRuntime().exec(command); + + BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); + bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); + + // command log + String line; + while ((line = bufferedReader.readLine()) != null) { + XxlJobHelper.log(line); + } + + // command exit + process.waitFor(); + exitValue = process.exitValue(); + } catch (Exception e) { + XxlJobHelper.log(e); + } finally { + if (bufferedReader != null) { + bufferedReader.close(); + } + } + + if (exitValue == 0) { + // default success + } else { + XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); + } + + } + + + /** + * 4、跨平台Http任务 + * 参数示例: + * "url: http://www.baidu.com\n" + + * "method: get\n" + + * "data: content\n"; + */ + @XxlJob("httpJobHandler") + public void httpJobHandler() throws Exception { + + // param parse + String param = XxlJobHelper.getJobParam(); + if (param==null || param.trim().length()==0) { + XxlJobHelper.log("param["+ param +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + + String[] httpParams = param.split("\n"); + String url = null; + String method = null; + String data = null; + for (String httpParam: httpParams) { + if (httpParam.startsWith("url:")) { + url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); + } + if (httpParam.startsWith("method:")) { + method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); + } + if (httpParam.startsWith("data:")) { + data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); + } + } + + // param valid + if (url==null || url.trim().length()==0) { + XxlJobHelper.log("url["+ url +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + if (method==null || !Arrays.asList("GET", "POST").contains(method)) { + XxlJobHelper.log("method["+ method +"] invalid."); + + XxlJobHelper.handleFail(); + return; + } + boolean isPostMethod = method.equals("POST"); + + // request + HttpURLConnection connection = null; + BufferedReader bufferedReader = null; + try { + // connection + URL realUrl = new URL(url); + connection = (HttpURLConnection) realUrl.openConnection(); + + // connection setting + connection.setRequestMethod(method); + connection.setDoOutput(isPostMethod); + connection.setDoInput(true); + connection.setUseCaches(false); + connection.setReadTimeout(5 * 1000); + connection.setConnectTimeout(3 * 1000); + connection.setRequestProperty("connection", "Keep-Alive"); + connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); + connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); + + // do connection + connection.connect(); + + // data + if (isPostMethod && data!=null && data.trim().length()>0) { + DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); + dataOutputStream.write(data.getBytes("UTF-8")); + dataOutputStream.flush(); + dataOutputStream.close(); + } + + // valid StatusCode + int statusCode = connection.getResponseCode(); + if (statusCode != 200) { + throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); + } + + // result + bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); + StringBuilder result = new StringBuilder(); + String line; + while ((line = bufferedReader.readLine()) != null) { + result.append(line); + } + String responseMsg = result.toString(); + + XxlJobHelper.log(responseMsg); + + return; + } catch (Exception e) { + XxlJobHelper.log(e); + + XxlJobHelper.handleFail(); + return; + } finally { + try { + if (bufferedReader != null) { + bufferedReader.close(); + } + if (connection != null) { + connection.disconnect(); + } + } catch (Exception e2) { + XxlJobHelper.log(e2); + } + } + + } + + /** + * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑; + */ + @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") + public void demoJobHandler2() throws Exception { + XxlJobHelper.log("XXL-JOB, Hello World."); + } + public void init(){ + logger.info("init"); + } + public void destroy(){ + logger.info("destroy"); + } + + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml new file mode 100644 index 00000000..da3a9721 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/log4j.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties new file mode 100644 index 00000000..d5670185 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-stationv3-collector-plugin/src/main/resources/xxl-job-executor.properties @@ -0,0 +1,17 @@ +### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" +xxl.job.admin.addresses=http://172.22.62.105/internal/etl/xxl-job-admin + +### xxl-job, access token +xxl.job.accessToken=default_token + +### xxl-job executor appname +xxl.job.executor.appname=qpsy-interbase2 +### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null +xxl.job.executor.address=http://172.22.62.105 +### xxl-job executor server-info +xxl.job.executor.ip= +xxl.job.executor.port=19998 +### xxl-job executor log-path +xxl.job.executor.logpath=logs/xxl-job/jobhandler +### xxl-job executor log-retention-days +xxl.job.executor.logretentiondays=30 \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/application.properties b/xxl-job-admin/src/main/resources/application.properties index d438057d..ff2454a0 100644 --- a/xxl-job-admin/src/main/resources/application.properties +++ b/xxl-job-admin/src/main/resources/application.properties @@ -1,6 +1,7 @@ ### web -server.port=8080 -server.servlet.context-path=/xxl-job-admin +server.address=10.201.1.101 +server.port=18080 +server.servlet.context-path=/internal/etl/xxl-job-admin ### actuator management.server.servlet.context-path=/actuator @@ -24,9 +25,9 @@ mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource -spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai +spring.datasource.url=jdbc:mysql://10.201.1.90:3306/common_xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root -spring.datasource.password=root_pwd +spring.datasource.password=dbmysql1205 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool diff --git a/xxl-job-admin/src/main/resources/logback.xml b/xxl-job-admin/src/main/resources/logback.xml index d4b08c24..35b07923 100644 --- a/xxl-job-admin/src/main/resources/logback.xml +++ b/xxl-job-admin/src/main/resources/logback.xml @@ -2,7 +2,7 @@ logback - + diff --git a/xxl-job-core/pom.xml b/xxl-job-core/pom.xml index a363344d..5afda271 100644 --- a/xxl-job-core/pom.xml +++ b/xxl-job-core/pom.xml @@ -60,5 +60,16 @@ - - \ No newline at end of file + + + fdrepo + Release + https://public.hidotcom.cn/libabc/repository/fdrepo/ + + + fdrepo + Snapshot + https://public.hidotcom.cn/libabc/repository/fdrepo/ + + + diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java index 9f594309..a8ea80a9 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/client/ExecutorBizClient.java @@ -30,27 +30,27 @@ public class ExecutorBizClient implements ExecutorBiz { @Override public ReturnT beat() { - return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class); + return XxlJobRemotingUtil.postBody(addressUrl+"internal/etl/wanhua/executor/beat", accessToken, timeout, "", String.class); } @Override public ReturnT idleBeat(IdleBeatParam idleBeatParam){ - return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class); + return XxlJobRemotingUtil.postBody(addressUrl+"internal/etl/wanhua/executor/idleBeat", accessToken, timeout, idleBeatParam, String.class); } @Override public ReturnT run(TriggerParam triggerParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); + return XxlJobRemotingUtil.postBody(addressUrl + "internal/etl/wanhua/executor/run", accessToken, timeout, triggerParam, String.class); } @Override public ReturnT kill(KillParam killParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class); + return XxlJobRemotingUtil.postBody(addressUrl + "internal/etl/wanhua/executor/kill", accessToken, timeout, killParam, String.class); } @Override public ReturnT log(LogParam logParam) { - return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class); + return XxlJobRemotingUtil.postBody(addressUrl + "internal/etl/wanhua/executor/log", accessToken, timeout, logParam, LogResult.class); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java index 540e0ea2..89f03693 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java @@ -145,7 +145,12 @@ public class EmbedServer { // request parse //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8); String requestData = msg.content().toString(CharsetUtil.UTF_8); - String uri = msg.uri(); + String fullUri = msg.uri(); + if (fullUri.startsWith("/internal/etl/wanhua/executor")) { + // 去掉前面的这一段 + fullUri = fullUri.substring("/internal/etl/wanhua/executor".length()); + } + String uri = fullUri; HttpMethod httpMethod = msg.method(); boolean keepAlive = HttpUtil.isKeepAlive(msg); String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN); diff --git a/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties b/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties index 9b1ab8a7..0706e953 100644 --- a/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties +++ b/xxl-job-executor-samples/xxl-job-executor-sample-frameless/src/main/resources/xxl-job-executor.properties @@ -1,5 +1,5 @@ ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" -xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin +xxl.job.admin.addresses=https://10.201.1.101:8082/internal/etl/xxl-job-admin ### xxl-job, access token xxl.job.accessToken=default_token @@ -10,8 +10,8 @@ xxl.job.executor.appname=xxl-job-executor-sample xxl.job.executor.address= ### xxl-job executor server-info xxl.job.executor.ip= -xxl.job.executor.port=9998 +xxl.job.executor.port=19998 ### xxl-job executor log-path -xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler +xxl.job.executor.logpath=logs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30 \ No newline at end of file