任务调度中心调度锁逻辑优化,事务SQL下沉至Mapper层统一管理,并增加测试用例,提升代码可读性以及可维护性;

pull/72/head
xuxueli 2 months ago
parent 3f6a051a0a
commit 51792ffe45

@ -2573,9 +2573,9 @@ public void execute() {
- 1、【升级】升级多项maven依赖至较新版本如 netty、groovy、spring、spring-ai、dify 等;
- 2、【修复】合并PR-2369修复脚本任务参数取值问题
- 3、【优化】报表统计SQL优化修复小概率情况下查询null值问题
- 4、【重构】调度中心底层组件重构组件初始化以及销毁逻辑统一处理任务触发及和回调逻辑优化避免资源泄漏风险
- 5、【重构】调度中心底层组件模块化拆分移除组件单例以及静态代码逻辑提升组件可维护性
- 6、【ING】事务SQL下沉至Mapper统一管理降低维护成本
- 4、【重构】任务调度中心底层组件重构,组件初始化以及销毁逻辑统一处理,任务触发及和回调逻辑优化,避免资源泄漏风险;
- 5、【重构】任务调度中心底层组件模块化拆分,移除组件单例以及静态代码逻辑,提升组件可维护性;
- 6、【优化】任务调度中心调度锁逻辑优化事务SQL下沉至Mapper层统一管理并增加测试用例提升代码可读性以及可维护性
- 7、【ING】UI框架重构升级提升交互体验
- 8、【ING】调整资源加载逻辑移除不必要的拦截器逻辑提升页面加载效率
- 9、【ING】规范API交互协议通用响应结构体调整为Response

@ -0,0 +1,18 @@
package com.xxl.job.admin.mapper;
import org.apache.ibatis.annotations.Mapper;
/**
* job lock
*
* @author xuxueli 2016-1-12 18:03:45
*/
@Mapper
public interface XxlJobLockMapper {
/**
* get schedule lock
*/
String scheduleLock();
}

@ -9,6 +9,7 @@ import com.xxl.job.admin.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.client.ExecutorBizClient;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.tool.core.StringTool;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -17,8 +18,8 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -149,7 +150,7 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean {
private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
if (StringTool.isBlank(address)) {
return null;
}
@ -206,9 +207,13 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean {
@Resource
private XxlJobLogReportMapper xxlJobLogReportMapper;
@Resource
private XxlJobLockMapper xxlJobLockMapper;
@Resource
private JavaMailSender mailSender;
/*@Resource
private DataSource dataSource;*/
@Resource
private DataSource dataSource;
private PlatformTransactionManager transactionManager;
@Resource
private JobAlarmer jobAlarmer;
@Resource
@ -277,12 +282,20 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean {
return xxlJobLogReportMapper;
}
public XxlJobLockMapper getXxlJobLockMapper() {
return xxlJobLockMapper;
}
public JavaMailSender getMailSender() {
return mailSender;
}
public DataSource getDataSource() {
/*public DataSource getDataSource() {
return dataSource;
}*/
public PlatformTransactionManager getTransactionManager() {
return transactionManager;
}
public JobAlarmer getJobAlarmer() {

@ -6,11 +6,12 @@ import com.xxl.job.admin.scheduler.cron.CronExpression;
import com.xxl.job.admin.scheduler.enums.MisfireStrategyEnum;
import com.xxl.job.admin.scheduler.enums.ScheduleTypeEnum;
import com.xxl.job.admin.scheduler.trigger.TriggerTypeEnum;
import com.xxl.tool.core.CollectionTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -40,6 +41,7 @@ public class JobScheduleHelper {
@Override
public void run() {
// align time
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (Throwable e) {
@ -52,31 +54,24 @@ public class JobScheduleHelper {
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminBootstrap.getInstance().getTriggerPoolFastMax() + XxlJobAdminBootstrap.getInstance().getTriggerPoolSlowMax()) * 20;
// do schedule
while (!scheduleThreadToStop) {
// Scan Job
// param
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminBootstrap.getInstance().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
// transaction start
TransactionStatus transactionStatus = XxlJobAdminBootstrap.getInstance().getTransactionManager().getTransaction(new DefaultTransactionDefinition());
try {
// 1、job lock
String lockedRecord = XxlJobAdminBootstrap.getInstance().getXxlJobLockMapper().scheduleLock();
long nowTime = System.currentTimeMillis();
// scan and process job
List<XxlJobInfo> scheduleList = XxlJobAdminBootstrap.getInstance().getXxlJobInfoMapper().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
if (CollectionTool.isNotEmpty(scheduleList)) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
@ -145,51 +140,15 @@ public class JobScheduleHelper {
preReadSuc = false;
}
// tx stop
} catch (Throwable e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e.getMessage(), e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (Throwable e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (Throwable e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (Throwable e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (Throwable e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// transaction commit
XxlJobAdminBootstrap.getInstance().getTransactionManager().commit(transactionStatus); // avlid schedule repeat
}
// transaction end
long cost = System.currentTimeMillis()-start;

@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xxl.job.admin.mapper.XxlJobLockMapper">
<select id="scheduleLock" resultType="java.lang.String" >
SELECT * FROM xxl_job_lock
WHERE lock_name = 'schedule_lock'
FOR UPDATE
</select>
</mapper>

@ -0,0 +1,49 @@
package com.xxl.job.admin.schedule;
import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap;
import com.xxl.tool.core.DateTool;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@SpringBootTest
public class JobScheduleTest {
private static Logger logger = LoggerFactory.getLogger(JobScheduleTest.class);
@Test
public void test() throws InterruptedException {
// thread
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
lockTest("threadName-" + finalI);
}).start();
}
TimeUnit.MINUTES.sleep(10);
}
private void lockTest(String threadName) {
TransactionStatus transactionStatus = XxlJobAdminBootstrap.getInstance().getTransactionManager().getTransaction(new DefaultTransactionDefinition());
try {
String lockedRecord = XxlJobAdminBootstrap.getInstance().getXxlJobLockMapper().scheduleLock(); // for update
logger.info(threadName + " : start at " + DateTool.format(new Date(), "yyyy-MM-dd HH:mm:ss SSS") );
TimeUnit.MILLISECONDS.sleep(500);
logger.info(threadName + " : end at " + DateTool.format(new Date(), "yyyy-MM-dd HH:mm:ss SSS") );
} catch (Throwable e) {
logger.error("error: ", e);
} finally {
logger.info(threadName + " : commit at " + DateTool.format(new Date(), "yyyy-MM-dd HH:mm:ss SSS") );
XxlJobAdminBootstrap.getInstance().getTransactionManager().commit(transactionStatus);
}
}
}
Loading…
Cancel
Save