xxl job for qpsy collector

pull/62/head
xieke 12 months ago
parent 9f5257ea59
commit 13f20a24f0

@ -50,6 +50,18 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.78</version>
</dependency>
<dependency>
<groupId>com.wanhua.sitedata.sync</groupId>
<artifactId>sitedata_sync_tools</artifactId>
<version>1.0-QP-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>

@ -0,0 +1,90 @@
package com.wanhua.frameless.qp.stationv2.config;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import lombok.Data;
@Data
public class Argument {
@Parameter(
names = "--stationId",
description = "station Id to sync, using station name",
required = false
)
private String stationId;
@Parameter(
names = "--city",
description = "city name to sync",
required = false
)
private String city;
@Parameter(
names = "--om",
description = "om name to sync",
required = false
)
private String om;
@Parameter(
names = "--tm",
description = "tm name to sync",
required = false
)
private String tm;
@Parameter(
names = "--startTime",
description = "startTime to sync, yyyy-MM-dd",
required = false
)
private String startTime;
@Parameter(
names = "--endTime",
description = "endTime to sync, yyyy-MM-dd",
required = false
)
private String endTime;
@Parameter(
names = "--clearDataFirst",
description = "clear data first",
required = false
)
private boolean clearDataFirst;
public static Argument parseAgument(String args) throws ParameterException {
Argument jArgs = new Argument();
JCommander cmd = JCommander.newBuilder()
.addObject(jArgs)
.build();
cmd.parse(args);
if (jArgs.stationId == null && jArgs.city == null && jArgs.om == null && jArgs.tm == null) {
throw new ParameterException("stationId or city or om or tm must be set");
}
if (jArgs.stationId != null && (jArgs.city != null || jArgs.om != null || jArgs.tm != null)) {
throw new ParameterException("stationId and city or om or tm can not be set together");
}
if (jArgs.city != null && (jArgs.om != null || jArgs.tm != null)) {
throw new ParameterException("city and om or tm can not be set together");
}
if (jArgs.om != null && jArgs.tm != null) {
throw new ParameterException("om and tm can not be set together");
}
if (jArgs.startTime != null && jArgs.endTime == null || jArgs.endTime != null && jArgs.startTime == null) {
throw new ParameterException("startTime or endTime must be set together");
}
return jArgs;
}
}

@ -1,5 +1,18 @@
package com.wanhua.frameless.qp.stationv2.jobhandler; package com.wanhua.frameless.qp.stationv2.jobhandler;
import com.beust.jcommander.ParameterException;
import com.wanhua.bigdata.kafka.service.IStationShiftControlService;
import com.wanhua.bigdata.kafka.service.impl.StationShiftControlServiceImpl;
import com.wanhua.frameless.qp.stationv2.config.Argument;
import com.wanhua.pump_tank_error_detect_base_dao_service.service.*;
import com.wanhua.pump_tank_error_detect_base_dao_service.service.impl.*;
import com.wanhua.sitedata.sync.CommonShiftDataJob;
import com.wanhua.sygl.service.service.IQpsyDeptService;
import com.wanhua.sygl.service.service.ISyglOriginalDetailService;
import com.wanhua.sygl.service.service.impl.QpsyDeptServiceImpl;
import com.wanhua.sygl.service.service.impl.QpsyOracleServiceImpl;
import com.wanhua.utils.mybaits.CommonMybatisHelper;
import com.wanhua.utils.mybaits.GetCommonMybatisUtils;
import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -11,8 +24,11 @@ import java.io.DataOutputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/** /**
* XxlJobBean * XxlJobBean
@ -29,16 +45,95 @@ public class QpStationV2CollectorJob {
private static Logger logger = LoggerFactory.getLogger(QpStationV2CollectorJob.class); private static Logger logger = LoggerFactory.getLogger(QpStationV2CollectorJob.class);
Consumer<String> getPrintlog() {
return new Consumer<String>() {
@Override
public void accept(String s) {
XxlJobHelper.log(s);
}
};
}
Consumer<Throwable> getPrintthrowable() {
return new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
XxlJobHelper.log(throwable);
}
};
}
/** /**
* 1Bean * 1Bean
*/ */
@XxlJob("demoJobHandler") @XxlJob("qp_collector")
public void demoJobHandler() throws Exception { public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World."); XxlJobHelper.log("QP Collector begin.");
String param = XxlJobHelper.getJobParam();
Argument argument = null;
try {
argument = Argument.parseAgument(param);
} catch (ParameterException e) {
XxlJobHelper.log("parse argument failed ");
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
return;
}
CommonMybatisHelper instance = GetCommonMybatisUtils.getMybatisUtilsByEnv("sygl");
CommonMybatisHelper oracleTradeInstance = GetCommonMybatisUtils.getMybatisUtilsByEnv("qporacle");
IStationService stationService = new StationServiceImpl(instance);
IDetectSetsService detectSetsService = new DetectSetsServiceImpl(instance);
IShiftTankDeliveryService shiftTankDeliveryService = new ShiftTankDeliveryServiceImpl(instance);
IPumpRelationService pumpRelationService = new PumpRelationService(instance);
IPumpService pumpService = new PumpService(instance);
IWeatherTemperatureDataService weatherTemperatureDataService = new WeatherTemperatureDataServiceImpl(instance);
IStationShiftControlService shiftControlService = new StationShiftControlServiceImpl(instance);
IQpsyDeptService qpsyDeptService = new QpsyDeptServiceImpl(instance);
ISyglOriginalDetailService syglOriginalDetailService = new QpsyOracleServiceImpl(oracleTradeInstance);
ILiquidLevelService liquidLevelService = new LiquidLevelService(instance);
IStationDataCaptureStatusService stationDataCaptureStatusService = new StationDataCaptureStatusServiceImpl(instance);
if (argument.getStationId() == null) {
XxlJobHelper.log("stationId is null, exit");
return;
}
String priorityStations = argument.getStationId();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Date startFetchDate = sdf.parse(sdf.format(new Date()));
Date endFetchDate = startFetchDate;
if (argument.getStartTime() != null && argument.getEndTime() != null) {
startFetchDate = sdf.parse(argument.getStartTime());
endFetchDate = sdf.parse(argument.getEndTime());
}
for (int i = 0; i < 5; i++) { CommonShiftDataJob job = new CommonShiftDataJob(
XxlJobHelper.log("beat at:" + i); priorityStations,
TimeUnit.SECONDS.sleep(2); stationService,
detectSetsService,
shiftTankDeliveryService,
pumpRelationService,
null,
pumpService,
stationDataCaptureStatusService,
qpsyDeptService,
syglOriginalDetailService,
liquidLevelService,
shiftControlService,
startFetchDate,
endFetchDate,
getPrintlog(),
getPrintthrowable());
try {
job.run();
} catch (Exception e) {
XxlJobHelper.log("collector failed");
XxlJobHelper.log(e);
XxlJobHelper.handleFail();
return;
} }
// default success // default success
} }

Loading…
Cancel
Save