重要重构:

1、调度中心回调逻辑优化, 过滤非法请求;
2、公共依赖中新增DBGlueLoader,基于原生jdbc实现GLUE源码的加载器,减少第三方依赖(mybatis,spring-orm等);
3、精简和优化执行器测配置(针对GLUE任务),降低上手难度
v1.5
xueli.xue 8 years ago
parent 8b2d0e42c4
commit 1fd34a2091

@ -1,6 +1,5 @@
package com.xxl.job.admin.core.callback; package com.xxl.job.admin.core.callback;
import com.xxl.job.admin.core.model.ReturnT;
import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.util.DynamicSchedulerUtil; import com.xxl.job.admin.core.util.DynamicSchedulerUtil;
@ -35,65 +34,80 @@ public class XxlJobLogCallbackServerHandler extends AbstractHandler {
// parse hex-json to request model // parse hex-json to request model
String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX); String requestHex = httpServletRequest.getParameter(XxlJobNetCommUtil.HEX);
// do biz
ResponseModel responseModel = dobiz(requestHex);
// format response model to hex-json
String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel);
// response
httpServletResponse.setContentType("text/html;charset=utf-8");
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
httpServletResponse.getWriter().println(responseHex);
}
private ResponseModel dobiz(String requestHex){
// valid hex
if (requestHex==null || requestHex.trim().length()==0) {
return new ResponseModel(ResponseModel.FAIL, "request hex is null.");
}
// valid request model
RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class); RequestModel requestModel = XxlJobNetCommUtil.parseHexJson2Obj(requestHex, RequestModel.class);
if (requestModel==null) {
return new ResponseModel(ResponseModel.FAIL, "request hex parse fail.");
}
// process // valid log item
ResponseModel responseModel = null;
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId()); XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(requestModel.getLogId());
if (log!=null) { if (log == null) {
return new ResponseModel(ResponseModel.FAIL, "log item not found.");
// trigger success, to trigger child job, and avoid repeat trigger child job }
String childTriggerMsg = null;
if (ResponseModel.SUCCESS.equals(requestModel.getStatus()) && !ResponseModel.SUCCESS.equals(log.getHandleStatus())) { // trigger success, to trigger child job, and avoid repeat trigger child job
XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName()); String childTriggerMsg = null;
if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { if (ResponseModel.SUCCESS.equals(requestModel.getStatus()) && !ResponseModel.SUCCESS.equals(log.getHandleStatus())) {
childTriggerMsg = "<hr>"; XxlJobInfo xxlJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(log.getJobGroup(), log.getJobName());
String[] childJobKeys = xxlJobInfo.getChildJobKey().split(","); if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
for (int i = 0; i < childJobKeys.length; i++) { childTriggerMsg = "<hr>";
String[] jobKeyArr = childJobKeys[i].split("_"); String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
if (jobKeyArr!=null && jobKeyArr.length==2) { for (int i = 0; i < childJobKeys.length; i++) {
XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(jobKeyArr[0], jobKeyArr[1]); String[] jobKeyArr = childJobKeys[i].split("_");
if (childJobInfo!=null) { if (jobKeyArr!=null && jobKeyArr.length==2) {
try { XxlJobInfo childJobInfo = DynamicSchedulerUtil.xxlJobInfoDao.load(jobKeyArr[0], jobKeyArr[1]);
boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), childJobInfo.getJobGroup()); if (childJobInfo!=null) {
try {
// add msg boolean ret = DynamicSchedulerUtil.triggerJob(childJobInfo.getJobName(), childJobInfo.getJobGroup());
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
(i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc()); // add msg
} catch (SchedulerException e) { childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务成功, 子任务Key: {2}, status: {3}, 子任务描述: {4}",
logger.error("", e); (i+1), childJobKeys.length, childJobKeys[i], ret, childJobInfo.getJobDesc());
} } catch (SchedulerException e) {
} else { logger.error("", e);
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]);
} }
} else { } else {
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}", childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务xxlJobInfo不存在, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]); (i+1), childJobKeys.length, childJobKeys[i]);
} }
} else {
childTriggerMsg += MessageFormat.format("<br> {0}/{1} 触发子任务失败, 子任务Key格式错误, 子任务Key: {2}",
(i+1), childJobKeys.length, childJobKeys[i]);
} }
} }
}
// save log }
log.setHandleTime(new Date());
log.setHandleStatus(requestModel.getStatus());
log.setHandleMsg(requestModel.getMsg() + childTriggerMsg);
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
responseModel = new ResponseModel(ResponseModel.SUCCESS, null);
} else {
responseModel = new ResponseModel(ResponseModel.FAIL, "log item not found.");
} }
// format response model to hex-json // success, save log
String responseHex = XxlJobNetCommUtil.formatObj2HexJson(responseModel); log.setHandleTime(new Date());
log.setHandleStatus(requestModel.getStatus());
log.setHandleMsg(requestModel.getMsg() + childTriggerMsg);
DynamicSchedulerUtil.xxlJobLogDao.updateHandleInfo(log);
// response return new ResponseModel(ResponseModel.SUCCESS, null);
httpServletResponse.setContentType("text/html;charset=utf-8");
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
httpServletResponse.getWriter().println(responseHex);
} }
} }

@ -0,0 +1,30 @@
package com.xxl.job.core.glue.loader.impl;
import com.xxl.job.core.glue.loader.GlueLoader;
import com.xxl.job.core.util.DBUtil;
import javax.sql.DataSource;
import java.util.List;
import java.util.Map;
/**
* Created by xuxueli on 16/9/30.
*/
public class DbGlueLoader implements GlueLoader {
private DataSource dataSource;
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public String load(String job_group, String job_name) {
String sql = "SELECT glue_source FROM XXL_JOB_QRTZ_TRIGGER_INFO WHERE job_group = ? AND job_name = ?";
List<Map<String, Object>> result = DBUtil.query(dataSource, sql, new String[]{job_group, job_name});
if (result!=null && result.size()==1 && result.get(0)!=null && result.get(0).get("glue_source")!=null ) {
return (String) result.get(0).get("glue_source");
}
return null;
}
}

@ -0,0 +1,128 @@
package com.xxl.job.core.util;
import javax.sql.DataSource;
import java.sql.*;
import java.util.*;
/**
* Created by xuxueli on 16/9/30.
*/
public class DBUtil {
private static Connection getConn(DataSource dataSource) {
try {
return dataSource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
/**
* update
*
* @param dataSource
* @param sql
* @param params
*/
public static int update(DataSource dataSource, String sql, Object params[]) {
Connection connection = getConn(dataSource);
PreparedStatement preparedStatement = null;
int ret = 0;
try {
preparedStatement = connection.prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
preparedStatement.setObject(i + 1, params[i]);
}
}
ret = preparedStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
release(connection, preparedStatement, null);
}
return ret;
}
/**
* query
*
* @param dataSource
* @param sql
* @param params
* @return
*/
public static List<Map<String, Object>> query(DataSource dataSource, String sql, Object[] params) {
Connection connection = getConn(dataSource);
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
try {
preparedStatement = connection.prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
preparedStatement.setObject(i + 1, params[i]);
}
}
resultSet = preparedStatement.executeQuery();
List<Map<String, Object>> ret = resultSetToList(resultSet);
return ret;
} catch (SQLException e) {
e.printStackTrace();
} finally {
release(connection, preparedStatement, resultSet);
}
return null;
}
private static List<Map<String, Object>> resultSetToList(ResultSet resultSet) throws SQLException {
if (resultSet == null) {
return Collections.EMPTY_LIST;
}
ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 得到结果集(rs)的结构信息,比如字段数、字段名等
int columnCount = resultSetMetaData.getColumnCount(); // 返回此 ResultSet 对象中的列数
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
while (resultSet.next()) {
Map<String, Object> rowData = new HashMap<String, Object>(columnCount);
for (int i = 1; i <= columnCount; i++) {
rowData.put(resultSetMetaData.getColumnName(i), resultSet.getObject(i));
}
list.add(rowData);
}
return list;
}
/**
* release
* @param connection
* @param preparedStatement
* @param resultSet
*/
public static void release(Connection connection, PreparedStatement preparedStatement, ResultSet resultSet) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}

@ -21,29 +21,7 @@
<artifactId>spring-webmvc</artifactId> <artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version> <version>${spring.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- springframe end -->
<!-- aspectjweaver (support spring aop) -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.7</version>
</dependency>
<!-- slf4j --> <!-- slf4j -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
@ -57,17 +35,6 @@
<artifactId>c3p0</artifactId> <artifactId>c3p0</artifactId>
<version>0.9.1.2</version> <version>0.9.1.2</version>
</dependency> </dependency>
<!-- mybatis-spring -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.2.8</version>
</dependency>
<!-- mysql-connector --> <!-- mysql-connector -->
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>

@ -1,27 +0,0 @@
package com.xxl.job.executor.loader;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.xxl.job.core.glue.loader.GlueLoader;
import com.xxl.job.executor.loader.dao.IXxlJobInfoDao;
import com.xxl.job.executor.loader.dao.model.XxlJobInfo;
/**
* GLUE RPC
* @author xuxueli
*/
@Service("dbGlueLoader")
public class DbGlueLoader implements GlueLoader {
@Resource
private IXxlJobInfoDao xxlJobInfoDao;
@Override
public String load(String job_group, String job_name) {
XxlJobInfo glue = xxlJobInfoDao.load(job_group, job_name);
return glue!=null?glue.getGlueSource():null;
}
}

@ -1,13 +0,0 @@
package com.xxl.job.executor.loader.dao;
import com.xxl.job.executor.loader.dao.model.XxlJobInfo;
/**
* job log for glue
* @author xuxueli 2016-5-19 18:04:56
*/
public interface IXxlJobInfoDao {
public XxlJobInfo load(String jobGroup, String jobName);
}

@ -1,33 +0,0 @@
package com.xxl.job.executor.loader.dao.impl;
import java.util.HashMap;
import javax.annotation.Resource;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Repository;
import com.xxl.job.executor.loader.dao.IXxlJobInfoDao;
import com.xxl.job.executor.loader.dao.model.XxlJobInfo;
/**
* job log for glue
* @author xuxueli 2016-5-19 18:17:52
*/
@Repository
public class XxlJobInfoDaoImpl implements IXxlJobInfoDao {
@Resource
public SqlSessionTemplate sqlSessionTemplate;
@Override
public XxlJobInfo load(String jobGroup, String jobName) {
HashMap<String, Object> params = new HashMap<String, Object>();
params.put("jobGroup", jobGroup);
params.put("jobName", jobName);
return sqlSessionTemplate.selectOne("XxlJobInfoMapper.load", params);
}
}

@ -1,38 +0,0 @@
package com.xxl.job.executor.loader.dao.model;
/**
* xxl-job info
* @author xuxueli 2016-5-19 17:57:46
*/
public class XxlJobInfo {
private String jobGroup;
private String jobName;
private String glueSource;
public String getJobGroup() {
return jobGroup;
}
public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getGlueSource() {
return glueSource;
}
public void setGlueSource(String glueSource) {
this.glueSource = glueSource;
}
}

@ -1,40 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="fileEncoding" value="utf-8" />
<property name="locations">
<list>
<value>classpath*:jdbc.properties</value>
</list>
</property>
</bean>
<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
<property name="driverClass" value="${c3p0.driverClass}" />
<property name="jdbcUrl" value="${c3p0.url}" />
<property name="user" value="${c3p0.user}" />
<property name="password" value="${c3p0.password}" />
<property name="initialPoolSize" value="3" />
<property name="minPoolSize" value="2" />
<property name="maxPoolSize" value="10" />
<property name="maxIdleTime" value="60" />
<property name="acquireRetryDelay" value="1000" />
<property name="acquireRetryAttempts" value="10" />
<property name="preferredTestQuery" value="SELECT 1" />
</bean>
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="mapperLocations" value="classpath*:mybatis-mapper/*.xml"/>
</bean>
<!-- scope must be "prototype" when junit -->
<bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate" scope="prototype">
<constructor-arg index="0" ref="sqlSessionFactory" />
</bean>
</beans>

@ -7,17 +7,51 @@
http://www.springframework.org/schema/context http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd"> http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- scan job handler -->
<context:component-scan base-package="com.xxl.job.executor" /> <context:component-scan base-package="com.xxl.job.executor" />
<!-- 执行器 --> <!-- 执行器
……………………………………………………………………………………………………………………
port : 执行器端口号
-->
<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" > <bean id="xxlJobExecutor" class="com.xxl.job.core.executor.jetty.XxlJobExecutor" init-method="start" destroy-method="destroy" >
<property name="port" value="9999" /> <property name="port" value="9999" />
</bean> </bean>
<!-- glue factory --> <!-- glue factory (开启GLUE任务时才需要, 否则可以删除)
……………………………………………………………………………………………………………………
cacheTimeout : GLUE任务示例缓存失效时间, 单位/ms
glueLoader : GLUE 代码加载器推荐将该服务配置成RPC服务
dataSource : XXL-JOB公共数据源
-->
<bean id="glueFactory" class="com.xxl.job.core.glue.GlueFactory"> <bean id="glueFactory" class="com.xxl.job.core.glue.GlueFactory">
<property name="cacheTimeout" value="10000" /> <property name="cacheTimeout" value="10000" />
<property name="glueLoader" ref="dbGlueLoader" /> <!-- GLUE 代码加载器推荐将该服务配置成RPC服务 --> <property name="glueLoader" >
<bean class="com.xxl.job.core.glue.loader.impl.DbGlueLoader" >
<property name="dataSource" ref="dataSource" />
</bean>
</property>
</bean>
<!-- XXL-JOB公共数据源 (开启GLUE任务时才需要, 否则可以删除) -->
<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
<property name="driverClass" value="${c3p0.driverClass}" />
<property name="jdbcUrl" value="${c3p0.url}" />
<property name="user" value="${c3p0.user}" />
<property name="password" value="${c3p0.password}" />
<property name="initialPoolSize" value="3" />
<property name="minPoolSize" value="2" />
<property name="maxPoolSize" value="10" />
<property name="maxIdleTime" value="60" />
<property name="acquireRetryDelay" value="1000" />
<property name="acquireRetryAttempts" value="10" />
<property name="preferredTestQuery" value="SELECT 1" />
</bean>
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="fileEncoding" value="utf-8" />
<property name="location">
<value>classpath:jdbc.properties</value>
</property>
</bean> </bean>
</beans> </beans>

@ -1,26 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="XxlJobInfoMapper">
<resultMap id="XxlJobInfo" type="com.xxl.job.executor.loader.dao.model.XxlJobInfo" >
<result column="job_group" property="jobGroup" />
<result column="job_name" property="jobName" />
<result column="glue_source" property="glueSource" />
</resultMap>
<sql id="Base_Column_List">
t.job_group,
t.job_name,
t.glue_source
</sql>
<select id="load" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" />
FROM XXL_JOB_QRTZ_TRIGGER_INFO AS t
WHERE t.job_group = #{jobGroup}
AND t.job_name = #{jobName}
</select>
</mapper>
Loading…
Cancel
Save