diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 550d295c..7febfc33 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -2573,9 +2573,9 @@ public void execute() { - 1、【升级】升级多项maven依赖至较新版本,如 netty、groovy、spring、spring-ai、dify 等; - 2、【修复】合并PR-2369,修复脚本任务参数取值问题; - 3、【优化】报表统计SQL优化,修复小概率情况下查询null值问题; -- 4、【重构】调度中心底层组件重构,组件初始化以及销毁逻辑统一处理,任务触发及和回调逻辑优化,避免资源泄漏风险; -- 5、【重构】调度中心底层组件模块化拆分,移除组件单例以及静态代码逻辑,提升组件可维护性; -- 6、【ING】事务SQL下沉至Mapper统一管理,降低维护成本; +- 4、【重构】任务调度中心底层组件重构,组件初始化以及销毁逻辑统一处理,任务触发及和回调逻辑优化,避免资源泄漏风险; +- 5、【重构】任务调度中心底层组件模块化拆分,移除组件单例以及静态代码逻辑,提升组件可维护性; +- 6、【优化】任务调度中心调度锁逻辑优化,事务SQL下沉至Mapper层统一管理,并增加测试用例,提升代码可读性以及可维护性; - 7、【ING】UI框架重构升级,提升交互体验; - 8、【ING】调整资源加载逻辑,移除不必要的拦截器逻辑,提升页面加载效率; - 9、【ING】规范API交互协议,通用响应结构体调整为Response; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/mapper/XxlJobLockMapper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/mapper/XxlJobLockMapper.java new file mode 100644 index 00000000..27d2cbbc --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/mapper/XxlJobLockMapper.java @@ -0,0 +1,18 @@ +package com.xxl.job.admin.mapper; + +import org.apache.ibatis.annotations.Mapper; + +/** + * job lock + * + * @author xuxueli 2016-1-12 18:03:45 + */ +@Mapper +public interface XxlJobLockMapper { + + /** + * get schedule lock + */ + String scheduleLock(); + +} \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/config/XxlJobAdminBootstrap.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/config/XxlJobAdminBootstrap.java index 59a77160..b70f6aff 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/config/XxlJobAdminBootstrap.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/config/XxlJobAdminBootstrap.java @@ -9,6 +9,7 @@ import com.xxl.job.admin.util.I18nUtil; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.client.ExecutorBizClient; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; +import com.xxl.tool.core.StringTool; import jakarta.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,8 +18,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.stereotype.Component; +import org.springframework.transaction.PlatformTransactionManager; -import javax.sql.DataSource; import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -149,7 +150,7 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean { private static ConcurrentMap executorBizRepository = new ConcurrentHashMap(); public static ExecutorBiz getExecutorBiz(String address) throws Exception { // valid - if (address==null || address.trim().length()==0) { + if (StringTool.isBlank(address)) { return null; } @@ -206,9 +207,13 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean { @Resource private XxlJobLogReportMapper xxlJobLogReportMapper; @Resource + private XxlJobLockMapper xxlJobLockMapper; + @Resource private JavaMailSender mailSender; + /*@Resource + private DataSource dataSource;*/ @Resource - private DataSource dataSource; + private PlatformTransactionManager transactionManager; @Resource private JobAlarmer jobAlarmer; @Resource @@ -277,12 +282,20 @@ public class XxlJobAdminBootstrap implements InitializingBean, DisposableBean { return xxlJobLogReportMapper; } + public XxlJobLockMapper getXxlJobLockMapper() { + return xxlJobLockMapper; + } + public JavaMailSender getMailSender() { return mailSender; } - public DataSource getDataSource() { + /*public DataSource getDataSource() { return dataSource; + }*/ + + public PlatformTransactionManager getTransactionManager() { + return transactionManager; } public JobAlarmer getJobAlarmer() { diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/thread/JobScheduleHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/thread/JobScheduleHelper.java index cdb89d91..c77b1522 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/thread/JobScheduleHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/scheduler/thread/JobScheduleHelper.java @@ -6,11 +6,12 @@ import com.xxl.job.admin.scheduler.cron.CronExpression; import com.xxl.job.admin.scheduler.enums.MisfireStrategyEnum; import com.xxl.job.admin.scheduler.enums.ScheduleTypeEnum; import com.xxl.job.admin.scheduler.trigger.TriggerTypeEnum; +import com.xxl.tool.core.CollectionTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; -import java.sql.Connection; -import java.sql.PreparedStatement; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -40,6 +41,7 @@ public class JobScheduleHelper { @Override public void run() { + // align time try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (Throwable e) { @@ -52,31 +54,24 @@ public class JobScheduleHelper { // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminBootstrap.getInstance().getTriggerPoolFastMax() + XxlJobAdminBootstrap.getInstance().getTriggerPoolSlowMax()) * 20; + // do schedule while (!scheduleThreadToStop) { - // Scan Job + // param long start = System.currentTimeMillis(); - - Connection conn = null; - Boolean connAutoCommit = null; - PreparedStatement preparedStatement = null; - boolean preReadSuc = true; - try { - - conn = XxlJobAdminBootstrap.getInstance().getDataSource().getConnection(); - connAutoCommit = conn.getAutoCommit(); - conn.setAutoCommit(false); - - preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); - preparedStatement.execute(); - - // tx start - // 1、pre read + // transaction start + TransactionStatus transactionStatus = XxlJobAdminBootstrap.getInstance().getTransactionManager().getTransaction(new DefaultTransactionDefinition()); + try { + // 1、job lock + String lockedRecord = XxlJobAdminBootstrap.getInstance().getXxlJobLockMapper().scheduleLock(); long nowTime = System.currentTimeMillis(); + + // scan and process job List scheduleList = XxlJobAdminBootstrap.getInstance().getXxlJobInfoMapper().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); - if (scheduleList!=null && scheduleList.size()>0) { + if (CollectionTool.isNotEmpty(scheduleList)) { + // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { @@ -145,51 +140,15 @@ public class JobScheduleHelper { preReadSuc = false; } - // tx stop - - } catch (Throwable e) { if (!scheduleThreadToStop) { - logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); + logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e.getMessage(), e); } } finally { - - // commit - if (conn != null) { - try { - conn.commit(); - } catch (Throwable e) { - if (!scheduleThreadToStop) { - logger.error(e.getMessage(), e); - } - } - try { - conn.setAutoCommit(connAutoCommit); - } catch (Throwable e) { - if (!scheduleThreadToStop) { - logger.error(e.getMessage(), e); - } - } - try { - conn.close(); - } catch (Throwable e) { - if (!scheduleThreadToStop) { - logger.error(e.getMessage(), e); - } - } - } - - // close PreparedStatement - if (null != preparedStatement) { - try { - preparedStatement.close(); - } catch (Throwable e) { - if (!scheduleThreadToStop) { - logger.error(e.getMessage(), e); - } - } - } + // transaction commit + XxlJobAdminBootstrap.getInstance().getTransactionManager().commit(transactionStatus); // avlid schedule repeat } + // transaction end long cost = System.currentTimeMillis()-start; diff --git a/xxl-job-admin/src/main/resources/mapper/XxlJobLockMapper.xml b/xxl-job-admin/src/main/resources/mapper/XxlJobLockMapper.xml new file mode 100644 index 00000000..ee365998 --- /dev/null +++ b/xxl-job-admin/src/main/resources/mapper/XxlJobLockMapper.xml @@ -0,0 +1,13 @@ + + + + + + + + \ No newline at end of file diff --git a/xxl-job-admin/src/test/java/com/xxl/job/admin/schedule/JobScheduleTest.java b/xxl-job-admin/src/test/java/com/xxl/job/admin/schedule/JobScheduleTest.java new file mode 100644 index 00000000..7ab93170 --- /dev/null +++ b/xxl-job-admin/src/test/java/com/xxl/job/admin/schedule/JobScheduleTest.java @@ -0,0 +1,49 @@ +package com.xxl.job.admin.schedule; + +import com.xxl.job.admin.scheduler.config.XxlJobAdminBootstrap; +import com.xxl.tool.core.DateTool; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +@SpringBootTest +public class JobScheduleTest { + private static Logger logger = LoggerFactory.getLogger(JobScheduleTest.class); + + @Test + public void test() throws InterruptedException { + + // thread + for (int i = 0; i < 10; i++) { + int finalI = i; + new Thread(() -> { + lockTest("threadName-" + finalI); + }).start(); + } + + TimeUnit.MINUTES.sleep(10); + } + + private void lockTest(String threadName) { + + TransactionStatus transactionStatus = XxlJobAdminBootstrap.getInstance().getTransactionManager().getTransaction(new DefaultTransactionDefinition()); + try { + String lockedRecord = XxlJobAdminBootstrap.getInstance().getXxlJobLockMapper().scheduleLock(); // for update + + logger.info(threadName + " : start at " + DateTool.format(new Date(), "yyyy-MM-dd HH:mm:ss SSS") ); + TimeUnit.MILLISECONDS.sleep(500); + logger.info(threadName + " : end at " + DateTool.format(new Date(), "yyyy-MM-dd HH:mm:ss SSS") ); + } catch (Throwable e) { + logger.error("error: ", e); + } finally { + logger.info(threadName + " : commit at " + DateTool.format(new Date(), "yyyy-MM-dd HH:mm:ss SSS") ); + XxlJobAdminBootstrap.getInstance().getTransactionManager().commit(transactionStatus); + } + } +}