diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml index 19974e94..a74a3435 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/pom.xml @@ -50,6 +50,18 @@ + + com.beust + jcommander + 1.78 + + + + com.wanhua.sitedata.sync + sitedata_sync_tools + 1.0-QP-SNAPSHOT + + diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java new file mode 100644 index 00000000..c597db78 --- /dev/null +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/config/Argument.java @@ -0,0 +1,90 @@ +package com.wanhua.frameless.qp.stationv2.config; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import lombok.Data; + +@Data +public class Argument { + @Parameter( + names = "--stationId", + description = "station Id to sync, using station name", + required = false + ) + private String stationId; + + @Parameter( + names = "--city", + description = "city name to sync", + required = false + ) + private String city; + + @Parameter( + names = "--om", + description = "om name to sync", + required = false + ) + private String om; + + @Parameter( + names = "--tm", + description = "tm name to sync", + required = false + ) + private String tm; + + @Parameter( + names = "--startTime", + description = "startTime to sync, yyyy-MM-dd", + required = false + ) + private String startTime; + + + @Parameter( + names = "--endTime", + description = "endTime to sync, yyyy-MM-dd", + required = false + ) + private String endTime; + + @Parameter( + names = "--clearDataFirst", + description = "clear data first", + required = false + ) + private boolean clearDataFirst; + + + public static Argument parseAgument(String args) throws ParameterException { + Argument jArgs = new Argument(); + JCommander cmd = JCommander.newBuilder() + .addObject(jArgs) + .build(); + cmd.parse(args); + + if (jArgs.stationId == null && jArgs.city == null && jArgs.om == null && jArgs.tm == null) { + throw new ParameterException("stationId or city or om or tm must be set"); + } + + if (jArgs.stationId != null && (jArgs.city != null || jArgs.om != null || jArgs.tm != null)) { + throw new ParameterException("stationId and city or om or tm can not be set together"); + } + + if (jArgs.city != null && (jArgs.om != null || jArgs.tm != null)) { + throw new ParameterException("city and om or tm can not be set together"); + } + + if (jArgs.om != null && jArgs.tm != null) { + throw new ParameterException("om and tm can not be set together"); + } + + if (jArgs.startTime != null && jArgs.endTime == null || jArgs.endTime != null && jArgs.startTime == null) { + throw new ParameterException("startTime or endTime must be set together"); + } + return jArgs; + } + +} diff --git a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java index bbd1942d..330ef123 100644 --- a/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java +++ b/wanhua-executor-plugins/wanhua-frameless-qp-stationv2-collector-plugin/src/main/java/com/wanhua/frameless/qp/stationv2/jobhandler/QpStationV2CollectorJob.java @@ -1,5 +1,18 @@ package com.wanhua.frameless.qp.stationv2.jobhandler; +import com.beust.jcommander.ParameterException; +import com.wanhua.bigdata.kafka.service.IStationShiftControlService; +import com.wanhua.bigdata.kafka.service.impl.StationShiftControlServiceImpl; +import com.wanhua.frameless.qp.stationv2.config.Argument; +import com.wanhua.pump_tank_error_detect_base_dao_service.service.*; +import com.wanhua.pump_tank_error_detect_base_dao_service.service.impl.*; +import com.wanhua.sitedata.sync.CommonShiftDataJob; +import com.wanhua.sygl.service.service.IQpsyDeptService; +import com.wanhua.sygl.service.service.ISyglOriginalDetailService; +import com.wanhua.sygl.service.service.impl.QpsyDeptServiceImpl; +import com.wanhua.sygl.service.service.impl.QpsyOracleServiceImpl; +import com.wanhua.utils.mybaits.CommonMybatisHelper; +import com.wanhua.utils.mybaits.GetCommonMybatisUtils; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; @@ -11,8 +24,11 @@ import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; +import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * XxlJob开发示例(Bean模式) @@ -29,16 +45,95 @@ public class QpStationV2CollectorJob { private static Logger logger = LoggerFactory.getLogger(QpStationV2CollectorJob.class); + Consumer getPrintlog() { + return new Consumer() { + @Override + public void accept(String s) { + XxlJobHelper.log(s); + } + }; + } + + Consumer getPrintthrowable() { + return new Consumer() { + @Override + public void accept(Throwable throwable) { + XxlJobHelper.log(throwable); + } + }; + } /** * 1、简单任务示例(Bean模式) */ - @XxlJob("demoJobHandler") + @XxlJob("qp_collector") public void demoJobHandler() throws Exception { - XxlJobHelper.log("XXL-JOB, Hello World."); + XxlJobHelper.log("QP Collector begin."); + + String param = XxlJobHelper.getJobParam(); + Argument argument = null; + try { + argument = Argument.parseAgument(param); + } catch (ParameterException e) { + XxlJobHelper.log("parse argument failed "); + XxlJobHelper.log(e); + XxlJobHelper.handleFail(); + return; + } + + CommonMybatisHelper instance = GetCommonMybatisUtils.getMybatisUtilsByEnv("sygl"); + CommonMybatisHelper oracleTradeInstance = GetCommonMybatisUtils.getMybatisUtilsByEnv("qporacle"); + + IStationService stationService = new StationServiceImpl(instance); + IDetectSetsService detectSetsService = new DetectSetsServiceImpl(instance); + IShiftTankDeliveryService shiftTankDeliveryService = new ShiftTankDeliveryServiceImpl(instance); + IPumpRelationService pumpRelationService = new PumpRelationService(instance); + IPumpService pumpService = new PumpService(instance); + IWeatherTemperatureDataService weatherTemperatureDataService = new WeatherTemperatureDataServiceImpl(instance); + IStationShiftControlService shiftControlService = new StationShiftControlServiceImpl(instance); + IQpsyDeptService qpsyDeptService = new QpsyDeptServiceImpl(instance); + ISyglOriginalDetailService syglOriginalDetailService = new QpsyOracleServiceImpl(oracleTradeInstance); + ILiquidLevelService liquidLevelService = new LiquidLevelService(instance); + IStationDataCaptureStatusService stationDataCaptureStatusService = new StationDataCaptureStatusServiceImpl(instance); + + if (argument.getStationId() == null) { + XxlJobHelper.log("stationId is null, exit"); + return; + } + + String priorityStations = argument.getStationId(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + Date startFetchDate = sdf.parse(sdf.format(new Date())); + Date endFetchDate = startFetchDate; + if (argument.getStartTime() != null && argument.getEndTime() != null) { + startFetchDate = sdf.parse(argument.getStartTime()); + endFetchDate = sdf.parse(argument.getEndTime()); + } - for (int i = 0; i < 5; i++) { - XxlJobHelper.log("beat at:" + i); - TimeUnit.SECONDS.sleep(2); + CommonShiftDataJob job = new CommonShiftDataJob( + priorityStations, + stationService, + detectSetsService, + shiftTankDeliveryService, + pumpRelationService, + null, + pumpService, + stationDataCaptureStatusService, + qpsyDeptService, + syglOriginalDetailService, + liquidLevelService, + shiftControlService, + startFetchDate, + endFetchDate, + getPrintlog(), + getPrintthrowable()); + + try { + job.run(); + } catch (Exception e) { + XxlJobHelper.log("collector failed"); + XxlJobHelper.log(e); + XxlJobHelper.handleFail(); + return; } // default success }