1.xxl-job创建执行器(不再硬编码)

2.修改表结构支持定时任务
pull/4/head
3y 3 years ago
parent ac6f1f148c
commit 0d246e34d5

@ -20,6 +20,11 @@ public class AustinConstant {
*/
public final static String YYYY_MM_DD = "yyyyMMdd";
/**
* cron
*/
public final static String CRON_FORMAT = "ss mm HH dd MM ? yyyy";
/**
* apollo

@ -9,7 +9,7 @@ package com.java3y.austin.constants;
public class XxlJobConstant {
/**
*
*
*/
public static final String LOGIN_URL = "/login";
public static final String INSERT_URL = "/jobinfo/add";
@ -19,9 +19,16 @@ public class XxlJobConstant {
public static final String STOP_URL = "/jobinfo/stop";
/**
*
*
*/
public static final String HANDLER_NAME = "austinJobHandler";
public static final String JOB_GROUP_PAGE_LIST = "/jobgroup/pageList";
public static final String JOB_GROUP_INSERT_URL = "/jobgroup/save";
/**
*
*/
public static final String JOB_HANDLER_NAME = "austinJob";
/**
*
@ -33,4 +40,9 @@ public class XxlJobConstant {
*/
public static final Integer RETRY_COUNT = 2;
/**
* ()
*/
public static final Integer DELAY_TIME = 5;
}

@ -0,0 +1,54 @@
package com.java3y.austin.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
*
*
* @author 3y
*/
@Data
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class XxlJobGroup {
private int id;
private String appname;
private String title;
/**
* 0=1=
*/
private int addressType;
/**
* ()
*/
private String addressList;
private Date updateTime;
/**
* registry list ()
*/
private List<String> registryList;
public List<String> getRegistryList() {
if (addressList != null && addressList.trim().length() > 0) {
registryList = new ArrayList<String>(Arrays.asList(addressList.split(",")));
}
return registryList;
}
}

@ -20,38 +20,104 @@ import java.util.Date;
@Builder
public class XxlJobInfo implements Serializable {
private Integer id; // 主键ID
/**
* ID
*/
private Integer id;
private int jobGroup; // 执行器主键ID
/**
* ID
*/
private int jobGroup;
private String jobDesc;
private Date addTime;
private Date updateTime;
private String author; // 负责人
private String alarmEmail; // 报警邮件
/**
*
*/
private String author;
private String scheduleType; // 调度类型
private String scheduleConf; // 调度配置,值含义取决于调度类型
/**
*
*/
private String alarmEmail;
private String misfireStrategy; // 调度过期策略
/**
*
*/
private String scheduleType;
/**
*
*/
private String scheduleConf;
private String executorRouteStrategy; // 执行器路由策略
private String executorHandler; // 执行器任务Handler名称
private String executorParam; // 执行器,任务参数
private String executorBlockStrategy; // 阻塞处理策略
private int executorTimeout; // 任务执行超时时间,单位秒
private int executorFailRetryCount; // 失败重试次数
/**
*
*/
private String misfireStrategy;
private String glueType; // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
private String glueSource; // GLUE源代码
private String glueRemark; // GLUE备注
private Date glueUpdatetime; // GLUE更新时间
/**
*
*/
private String executorRouteStrategy;
private String childJobId; // 子任务ID多个逗号分隔
/**
* Handler
*/
private String executorHandler;
/**
*
*/
private String executorParam;
private int triggerStatus; // 调度状态0-停止1-运行
private long triggerLastTime; // 上次调度时间
private long triggerNextTime; // 下次调度时间
/**
*
*/
private String executorBlockStrategy;
/**
*
*/
private int executorTimeout;
/**
*
*/
private int executorFailRetryCount;
/**
* GLUE #com.xxl.job.core.glue.GlueTypeEnum
*/
private String glueType;
/**
* GLUE
*/
private String glueSource;
/**
* GLUE
*/
private String glueRemark;
/**
* GLUE
*/
private Date glueUpdatetime;
/**
* ID
*/
private String childJobId;
/**
* 0-1-
*/
private int triggerStatus;
/**
*
*/
private long triggerLastTime;
/**
*
*/
private long triggerNextTime;
}

@ -1,19 +1,28 @@
package com.java3y.austin.handler;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.domain.MessageTemplate;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
*
* @author 3y
*/
@Service
@Slf4j
public class CronTaskHandler {
/**
*
* austin
*/
@XxlJob("austinJobHandler")
@XxlJob("austinJob")
public void execute() {
log.info("XXL-JOB, Hello World.");
MessageTemplate messageTemplate = JSON.parseObject(XxlJobHelper.getJobParam(), MessageTemplate.class);
}
}

@ -1,5 +1,6 @@
package com.java3y.austin.service;
import com.java3y.austin.entity.XxlJobGroup;
import com.java3y.austin.entity.XxlJobInfo;
import com.java3y.austin.vo.BasicResultVO;
@ -39,4 +40,16 @@ public interface CronTaskService {
BasicResultVO stopCronTask(Integer taskId);
/**
* Id
*
* @return
*/
BasicResultVO getGroupId(String appName, String title);
/**
*
*/
BasicResultVO createGroup(XxlJobGroup xxlJobGroup);
}

@ -5,6 +5,7 @@ import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.constants.XxlJobConstant;
import com.java3y.austin.entity.XxlJobGroup;
import com.java3y.austin.entity.XxlJobInfo;
import com.java3y.austin.enums.RespStatusEnum;
import com.java3y.austin.service.CronTaskService;
@ -38,39 +39,25 @@ public class CronTaskServiceImpl implements CronTaskService {
@Override
public BasicResultVO saveCronTask(XxlJobInfo xxlJobInfo) {
Map<String, Object> params = JSON.parseObject(JSON.toJSONString(xxlJobInfo), Map.class);
String path;
if (xxlJobInfo.getId() == null) {
path = xxlAddresses + XxlJobConstant.INSERT_URL;
} else {
path = xxlAddresses + XxlJobConstant.UPDATE_URL;
}
//XxlJobInfo.builder().jobGroup(1).jobDesc()
// Map<String, Object> paramMap = new HashMap<>();
// paramMap.put("jobGroup", 1);
// paramMap.put("jobDesc", "这是测试任务");
// paramMap.put("executorRouteStrategy", "FIRST");
// paramMap.put("cronGen_display", "* * * * * ? *");
// paramMap.put("scheduleConf", "* * * * * ? *");
// paramMap.put("year", "2");
// paramMap.put("misfireStrategy", "DO_NOTHING");
// paramMap.put("glueType", "BEAN");
// paramMap.put("schedule_conf_CRON", "* * * * * ? *");
// paramMap.put("executorHandler", "messageJob"); // 此处hander需提前在项目中定义
// paramMap.put("executorBlockStrategy", "SERIAL_EXECUTION");
// paramMap.put("executorTimeout", 0);
// paramMap.put("executorFailRetryCount", 1);
// paramMap.put("author", "admin");
// paramMap.put("glueRemark", "GLUE代码初始化");
// paramMap.put("triggerStatus", 1);
// paramMap.put("scheduleType", "CRON");
HttpResponse response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
if (!response.isOk()) {
log.error("TaskService#saveTask fail:{}", JSON.toJSONString(response.body()));
String path = xxlJobInfo.getId() == null ? xxlAddresses + XxlJobConstant.INSERT_URL
: xxlAddresses + XxlJobConstant.UPDATE_URL;
HttpResponse response = null;
try {
response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
// 插入时需要返回Id而更新时不需要
if (path.contains(XxlJobConstant.INSERT_URL) && response.isOk()) {
Integer taskId = Integer.parseInt(String.valueOf(JSON.parseObject(response.body()).get("content")));
return BasicResultVO.success(taskId);
} else if (response.isOk()) {
return BasicResultVO.success();
}
} catch (Exception e) {
log.error("CronTaskService#saveTask fail:{}", JSON.toJSONString(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
return BasicResultVO.success(JSON.parseObject(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
@Override
@ -80,7 +67,7 @@ public class CronTaskServiceImpl implements CronTaskService {
String path = xxlAddresses + XxlJobConstant.DELETE_URL;
HttpResponse response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
if (!response.isOk()) {
log.error("TaskService#deleteCronTask fail:{}", JSON.toJSONString(response.body()));
log.error("CronTaskService#deleteCronTask fail:{}", JSON.toJSONString(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
return BasicResultVO.success();
@ -90,10 +77,11 @@ public class CronTaskServiceImpl implements CronTaskService {
public BasicResultVO startCronTask(Integer taskId) {
HashMap<String, Object> params = new HashMap<>();
params.put("id", taskId);
String path = xxlAddresses + XxlJobConstant.RUN_URL;
HttpResponse response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
if (!response.isOk()) {
log.error("TaskService#startCronTask fail:{}", JSON.toJSONString(response.body()));
log.error("CronTaskService#startCronTask fail:{}", JSON.toJSONString(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
return BasicResultVO.success();
@ -107,7 +95,41 @@ public class CronTaskServiceImpl implements CronTaskService {
String path = xxlAddresses + XxlJobConstant.STOP_URL;
HttpResponse response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
if (!response.isOk()) {
log.error("TaskService#stopCronTask fail:{}", JSON.parseObject(response.body()));
log.error("CronTaskService#stopCronTask fail:{}", JSON.toJSONString(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
return BasicResultVO.success();
}
@Override
public BasicResultVO getGroupId(String appName, String title) {
String path = xxlAddresses + XxlJobConstant.JOB_GROUP_PAGE_LIST;
HashMap<String, Object> params = new HashMap<>();
params.put("appname", appName);
params.put("title", title);
HttpResponse response = null;
try {
response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
Integer id = JSON.parseObject(response.body()).getJSONArray("data").getJSONObject(0).getInteger("id");
if (response.isOk() && id != null) {
return BasicResultVO.success(id);
}
} catch (Exception e) {
log.error("CronTaskService#getGroupId fail:{}", JSON.toJSONString(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
@Override
public BasicResultVO createGroup(XxlJobGroup xxlJobGroup) {
Map<String, Object> params = JSON.parseObject(JSON.toJSONString(xxlJobGroup), Map.class);
String path = xxlAddresses + XxlJobConstant.JOB_GROUP_INSERT_URL;
HttpResponse response = HttpRequest.post(path).form(params).cookie(getCookie()).execute();
if (!response.isOk()) {
log.error("CronTaskService#createGroup fail:{}", JSON.toJSONString(response.body()));
return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR, JSON.toJSONString(response.body()));
}
return BasicResultVO.success();
@ -119,13 +141,12 @@ public class CronTaskServiceImpl implements CronTaskService {
* @return String
*/
private String getCookie() {
String path = xxlAddresses + XxlJobConstant.LOGIN_URL;
Map<String, Object> hashMap = new HashMap<>();
hashMap.put("userName", xxlUserName);
hashMap.put("password", xxlPassword);
hashMap.put("randomCode", IdUtil.fastSimpleUUID());
log.info("TaskService#getCookie params{}", hashMap);
String path = xxlAddresses + XxlJobConstant.LOGIN_URL;
HttpResponse response = HttpRequest.post(path).form(hashMap).execute();
if (response.isOk()) {
List<HttpCookie> cookies = response.getCookies();
@ -135,7 +156,7 @@ public class CronTaskServiceImpl implements CronTaskService {
}
return sb.toString();
}
log.error("TaskService#getCookie fail:{}", JSON.parseObject(response.body()));
log.error("CronTaskService#getCookie fail:{}", JSON.parseObject(response.body()));
return null;
}
}

@ -1,43 +1,61 @@
package com.java3y.austin.utils;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.constant.AustinConstant;
import com.java3y.austin.constants.XxlJobConstant;
import com.java3y.austin.domain.MessageTemplate;
import com.java3y.austin.entity.XxlJobGroup;
import com.java3y.austin.entity.XxlJobInfo;
import com.java3y.austin.enums.*;
import com.java3y.austin.service.CronTaskService;
import com.java3y.austin.vo.BasicResultVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* xxlJob
*
* @author 3y
*/
@Component
public class XxlJobUtils {
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.jobHandlerName}")
private String jobHandlerName;
@Autowired
private CronTaskService cronTaskService;
/**
* xxlJobInfo
*
* @param messageTemplate
* @return
*/
public static XxlJobInfo buildXxlJobInfo(MessageTemplate messageTemplate) {
// 判断是否为cron表达式
String scheduleConf = StrUtil.EMPTY;
String scheduleType = ScheduleTypeEnum.NONE.name();
if (!messageTemplate.getExpectPushTime().equals(String.valueOf(AustinConstant.FALSE))) {
scheduleType = ScheduleTypeEnum.CRON.name();
scheduleConf = messageTemplate.getExpectPushTime();
public XxlJobInfo buildXxlJobInfo(MessageTemplate messageTemplate) {
String scheduleConf = messageTemplate.getExpectPushTime();
// 如果没有指定cron表达式说明立即执行(给到xxl-job延迟5秒的cron表达式)
if (messageTemplate.getExpectPushTime().equals(String.valueOf(AustinConstant.FALSE))) {
scheduleConf = DateUtil.format(DateUtil.offsetSecond(new Date(), XxlJobConstant.DELAY_TIME), AustinConstant.CRON_FORMAT);
}
XxlJobInfo xxlJobInfo = XxlJobInfo.builder().jobGroup(1).jobDesc(messageTemplate.getName())
XxlJobInfo xxlJobInfo = XxlJobInfo.builder()
.jobGroup(queryJobGroupId()).jobDesc(messageTemplate.getName())
.author(messageTemplate.getCreator())
.scheduleConf(scheduleConf)
.scheduleType(scheduleType)
.scheduleType(ScheduleTypeEnum.CRON.name())
.misfireStrategy(MisfireStrategyEnum.DO_NOTHING.name())
.executorRouteStrategy(ExecutorRouteStrategyEnum.CONSISTENT_HASH.name())
.executorHandler(XxlJobConstant.HANDLER_NAME)
.executorHandler(XxlJobConstant.JOB_HANDLER_NAME)
.executorParam(JSON.toJSONString(messageTemplate))
.executorBlockStrategy(ExecutorBlockStrategyEnum.SERIAL_EXECUTION.name())
.executorTimeout(XxlJobConstant.TIME_OUT)
@ -54,4 +72,20 @@ public class XxlJobUtils {
}
return xxlJobInfo;
}
/**
* jobGroupId
* @return
*/
private Integer queryJobGroupId() {
BasicResultVO basicResultVO = cronTaskService.getGroupId(appName, jobHandlerName);
if (basicResultVO.getData() == null) {
XxlJobGroup xxlJobGroup = XxlJobGroup.builder().appname(appName).title(jobHandlerName).addressType(AustinConstant.FALSE).build();
if (RespStatusEnum.SUCCESS.getCode().equals(cronTaskService.createGroup(xxlJobGroup).getStatus())) {
return (int) cronTaskService.getGroupId(appName, jobHandlerName).getData();
}
}
return (Integer) basicResultVO.getData();
}
}

@ -55,6 +55,12 @@ public class MessageTemplate implements Serializable {
*/
private Integer cronTaskId;
/**
* ID
* 1. Id
* 2. IDId
*/
private String cronCrowdId;
/**
* Id

@ -52,6 +52,7 @@
<artifactId>springfox-boot-starter</artifactId>
</dependency>
</dependencies>
<build>

@ -115,7 +115,6 @@ public class MessageTemplateController {
Map<String, String> variables = JSON.parseObject(messageTemplateParam.getMsgContent(), Map.class);
MessageParam messageParam = MessageParam.builder().receiver(messageTemplateParam.getReceiver()).variables(variables).build();
SendRequest sendRequest = SendRequest.builder().code(BusinessCode.COMMON_SEND.getCode()).messageTemplateId(messageTemplateParam.getId()).messageParam(messageParam).build();
SendResponse response = sendService.send(sendRequest);
if (response.getCode() != RespStatusEnum.SUCCESS.getCode()) {
return BasicResultVO.fail(response.getMsg());
@ -130,9 +129,7 @@ public class MessageTemplateController {
@PostMapping("start/{id}")
@ApiOperation("/启动模板的定时任务")
public BasicResultVO start(@RequestBody @PathVariable("id") Long id) {
messageTemplateService.startCronTask(id);
return BasicResultVO.success();
return messageTemplateService.startCronTask(id);
}
/**
*
@ -140,8 +137,6 @@ public class MessageTemplateController {
@PostMapping("stop/{id}")
@ApiOperation("/暂停模板的定时任务")
public BasicResultVO stop(@RequestBody @PathVariable("id") Long id) {
messageTemplateService.stopCronTask(id);
return BasicResultVO.success();
return messageTemplateService.stopCronTask(id);
}
}

@ -3,13 +3,13 @@ package com.java3y.austin.service.impl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.java3y.austin.constant.AustinConstant;
import com.java3y.austin.dao.MessageTemplateDao;
import com.java3y.austin.domain.MessageTemplate;
import com.java3y.austin.entity.XxlJobInfo;
import com.java3y.austin.enums.AuditStatus;
import com.java3y.austin.enums.MessageStatus;
import com.java3y.austin.enums.RespStatusEnum;
import com.java3y.austin.enums.TemplateType;
import com.java3y.austin.service.CronTaskService;
import com.java3y.austin.service.MessageTemplateService;
@ -38,6 +38,9 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
@Autowired
private CronTaskService cronTaskService;
@Autowired
private XxlJobUtils xxlJobUtils;
@Override
public List<MessageTemplate> queryList(MessageTemplateParam param) {
PageRequest pageRequest = PageRequest.of(param.getPage() - 1, param.getPerPage());
@ -86,20 +89,24 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
// 1.修改模板状态
MessageTemplate messageTemplate = messageTemplateDao.findById(id).get();
// 2.动态创建定时任务并启动
XxlJobInfo xxlJobInfo = XxlJobUtils.buildXxlJobInfo(messageTemplate);
// 2.动态创建或更新定时任务
XxlJobInfo xxlJobInfo = xxlJobUtils.buildXxlJobInfo(messageTemplate);
// 3.获取taskId(如果本身存在则复用原有任务如果不存在则得到新建后任务ID)
Integer taskId = messageTemplate.getCronTaskId();
BasicResultVO basicResultVO = cronTaskService.saveCronTask(xxlJobInfo);
JSONObject data = (JSONObject) basicResultVO.getData();
if (data.get("content") != null) {
cronTaskService.startCronTask(Integer.valueOf(String.valueOf(data.get("content"))));
MessageTemplate clone = ObjectUtil.clone(messageTemplate).setMsgStatus(MessageStatus.RUN.getCode()).setUpdated(Math.toIntExact(DateUtil.currentSeconds()));
if (taskId == null && RespStatusEnum.SUCCESS.getCode().equals(basicResultVO.getStatus()) && basicResultVO.getData() != null) {
taskId = Integer.valueOf(String.valueOf(basicResultVO.getData()));
}
// 4. 启动定时任务
if (taskId != null) {
cronTaskService.startCronTask(taskId);
MessageTemplate clone = ObjectUtil.clone(messageTemplate).setMsgStatus(MessageStatus.RUN.getCode()).setCronTaskId(taskId).setUpdated(Math.toIntExact(DateUtil.currentSeconds()));
messageTemplateDao.save(clone);
return BasicResultVO.success();
} else {
return BasicResultVO.fail();
}
return BasicResultVO.fail();
}
@Override
@ -132,7 +139,7 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
/**
* 1.
* 2.
* 2. ()
*
* @param messageTemplate
*/
@ -141,7 +148,7 @@ public class MessageTemplateServiceImpl implements MessageTemplateService {
.setMsgStatus(MessageStatus.INIT.getCode()).setAuditStatus(AuditStatus.WAIT_AUDIT.getCode());
if (messageTemplate.getCronTaskId() != null && TemplateType.CLOCKING.getCode().equals(messageTemplate.getTemplateType())) {
XxlJobInfo xxlJobInfo = XxlJobUtils.buildXxlJobInfo(messageTemplate);
XxlJobInfo xxlJobInfo = xxlJobUtils.buildXxlJobInfo(messageTemplate);
cronTaskService.saveCronTask(xxlJobInfo);
cronTaskService.stopCronTask(messageTemplate.getCronTaskId());
}

@ -1,7 +1,8 @@
create
database austin;
use austin;
use
austin;
CREATE TABLE `message_template`
@ -9,9 +10,10 @@ CREATE TABLE `message_template`
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '标题',
`audit_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '当前消息审核状态: 10.待审核 20.审核成功 30.被拒绝',
`flow_id` varchar(50) COLLATE utf8mb4_unicode_ci COMMENT '工单ID',
`flow_id` varchar(50) COLLATE utf8mb4_unicode_ci COMMENT '工单ID',
`msg_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '当前消息状态10.新建 20.停用 30.启用 40.等待发送 50.发送中 60.发送成功 70.发送失败',
`cron_task_id` bigint(20) COMMENT '定时任务Id (xxl-job-admin返回)',
`cron_crowd_id` varchar (50) COMMENT '定时发送人群ID',
`id_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '消息的发送ID类型10. userId 20.did 30.手机号 40.openId 50.email',
`send_channel` tinyint(4) NOT NULL DEFAULT '0' COMMENT '消息发送渠道10.IM 20.Push 30.短信 40.Email 50.公众号 60.小程序',
`template_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '10.运营类 20.技术类接口调用',

Loading…
Cancel
Save