From 7e49a111d3abea4f21b0fed4fc7badb0f1917666 Mon Sep 17 00:00:00 2001 From: Yang Libin Date: Thu, 13 Jul 2023 20:11:10 +0800 Subject: [PATCH] =?UTF-8?q?Update=20SpringBootBatch=E6=BA=90=E7=A0=81.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/SpringBootBatch/SpringBootBatch源码.md | 2479 ++++++++--------- 1 file changed, 1214 insertions(+), 1265 deletions(-) diff --git a/docs/SpringBootBatch/SpringBootBatch源码.md b/docs/SpringBootBatch/SpringBootBatch源码.md index 8abb589..28effec 100644 --- a/docs/SpringBootBatch/SpringBootBatch源码.md +++ b/docs/SpringBootBatch/SpringBootBatch源码.md @@ -5,7 +5,6 @@ 版本使用 2.7.13 ```xml - org.springframework.boot spring-boot-dependencies @@ -56,54 +55,54 @@ JobBuilderFactory 需要`jobRepository`。`SimpleBatchConfiguration#jobRepositor 接着进入`initialize`方法 ```java - /** +/** * Sets up the basic components by extracting them from the {@link BatchConfigurer configurer}, defaulting to some * sensible values as long as a unique DataSource is available. * * @throws Exception if there is a problem in the configurer */ -protected void initialize()throws Exception{ - if(initialized){ +protected void initialize() throws Exception { + if (initialized) { return; - } - BatchConfigurer configurer=getConfigurer(context.getBeansOfType(BatchConfigurer.class).values()); - jobRepository.set(configurer.getJobRepository()); - jobLauncher.set(configurer.getJobLauncher()); - transactionManager.set(configurer.getTransactionManager()); - jobRegistry.set(new MapJobRegistry()); - jobExplorer.set(configurer.getJobExplorer()); - initialized=true; - } + } + BatchConfigurer configurer = getConfigurer(context.getBeansOfType(BatchConfigurer.class).values()); + jobRepository.set(configurer.getJobRepository()); + jobLauncher.set(configurer.getJobLauncher()); + transactionManager.set(configurer.getTransactionManager()); + jobRegistry.set(new MapJobRegistry()); + jobExplorer.set(configurer.getJobExplorer()); + initialized = true; +} ``` > BatchConfigurer configurer=getConfigurer(context.getBeansOfType(BatchConfigurer.class).values()); ```java - protected BatchConfigurer getConfigurer(Collection configurers)throws Exception{ - if(this.configurer!=null){ +protected BatchConfigurer getConfigurer(Collection configurers) throws Exception { + if (this.configurer != null) { return this.configurer; + } + if (configurers == null || configurers.isEmpty()) { + if (dataSource == null) { + DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(); + configurer.initialize(); + this.configurer = configurer; + return configurer; + } else { + DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource); + configurer.initialize(); + this.configurer = configurer; + return configurer; } - if(configurers==null||configurers.isEmpty()){ - if(dataSource==null){ - DefaultBatchConfigurer configurer=new DefaultBatchConfigurer(); - configurer.initialize(); - this.configurer=configurer; - return configurer; - }else{ - DefaultBatchConfigurer configurer=new DefaultBatchConfigurer(dataSource); - configurer.initialize(); - this.configurer=configurer; - return configurer; - } - } - if(configurers.size()>1){ + } + if (configurers.size() > 1) { throw new IllegalStateException( - "To use a custom BatchConfigurer the context must contain precisely one, found " - +configurers.size()); - } - this.configurer=configurers.iterator().next(); - return this.configurer; - } + "To use a custom BatchConfigurer the context must contain precisely one, found " + + configurers.size()); + } + this.configurer = configurers.iterator().next(); + return this.configurer; +} ``` 从 spring 中获取 `BatchConfigurer`,没有使用 `DefaultBatchConfigurer`,并调用 `configurer.initialize()` @@ -193,30 +192,30 @@ public class BatchConfig { 使用`JobBuilderFactory`创建出`Step`。`JobBuilderFactory`在上面`AbstractBatchConfiguration`中创建了。 ```java - @Bean -public Step step(){ - TaskletStep taskletStep=stepBuilders.get("test-step") - .chunk(2) - .reader(itemReader()) - .writer(itemWriter()) - .listener(new StepExecutionListener(){ -@Override -public void beforeStep(StepExecution stepExecution){ - System.out.println("step-我是beforeJob--"+stepExecution); - } +@Bean +public Step step() { + TaskletStep taskletStep = stepBuilders.get("test-step") + .chunk(2) + .reader(itemReader()) + .writer(itemWriter()) + .listener(new StepExecutionListener() { + @Override + public void beforeStep(StepExecution stepExecution) { + System.out.println("step-我是beforeJob--" + stepExecution); + } -@Override -public ExitStatus afterStep(StepExecution stepExecution){ - System.out.println("step-我是beforeJob--"+stepExecution); - return stepExecution.getExitStatus(); - } - }) + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + System.out.println("step-我是beforeJob--" + stepExecution); + return stepExecution.getExitStatus(); + } + }) // .startLimit(2) - .build(); + .build(); - taskletStep.setAllowStartIfComplete(true); - return taskletStep; - } + taskletStep.setAllowStartIfComplete(true); + return taskletStep; +} ``` #### stepBuilders.get("test-stop") @@ -224,11 +223,11 @@ public ExitStatus afterStep(StepExecution stepExecution){ `StepBuilderFactory`只有一个方法 get ```java - public StepBuilder get(String name){ - StepBuilder builder=new StepBuilder(name).repository(jobRepository).transactionManager( - transactionManager); - return builder; - } +public StepBuilder get(String name) { + StepBuilder builder = new StepBuilder(name).repository(jobRepository).transactionManager( + transactionManager); + return builder; +} ``` `stepBuilders.get("test-stop")`创建一个`StepBuilder`。 @@ -237,10 +236,10 @@ public ExitStatus afterStep(StepExecution stepExecution){ 中有个`protected final CommonStepProperties properties;`属性。 对`CommonStepProperties`中`name`,`JobRepository`和`PlatformTransactionManager`进行赋值。 -#### .chunk(2) +#### chunk(2) ```java - /** +/** * Build a step that processes items in chunks with the size provided. To extend the step to being fault tolerant, * call the {@link SimpleStepBuilder#faultTolerant()} method on the builder. In most cases you will want to * parameterize your call to this method, to preserve the type safety of your readers and writers, e.g. @@ -256,8 +255,8 @@ public ExitStatus afterStep(StepExecution stepExecution){ * @param the type of item to be output */ public SimpleStepBuilder chunk(int chunkSize){ - return new SimpleStepBuilder(this).chunk(chunkSize); - } + return new SimpleStepBuilder(this).chunk(chunkSize); +} ``` 创建一个`SimpleStepBuilder`,并确定泛型,设置一个`chunkSize` @@ -273,21 +272,21 @@ public SimpleStepBuilder chunk(int chunkSize){ 创建一个 ```java - @Bean -public ItemReader itemReader(){ - return new ItemReader(){ -final AtomicInteger atomicInteger=new AtomicInteger(); +@Bean +public ItemReader itemReader() { + return new ItemReader() { + final AtomicInteger atomicInteger = new AtomicInteger(); -@Override -public Integer read()throws UnexpectedInputException,ParseException,NonTransientResourceException{ - if(atomicInteger.compareAndSet(10,0)){ - return null; - } - atomicInteger.getAndIncrement(); - return atomicInteger.intValue(); - } - }; + @Override + public Integer read() throws UnexpectedInputException, ParseException, NonTransientResourceException { + if (atomicInteger.compareAndSet(10, 0)) { + return null; + } + atomicInteger.getAndIncrement(); + return atomicInteger.intValue(); } + }; +} ``` 每次读取 1 个值,总共读取 10 个值。返回 null 表示没有数据了。 @@ -298,32 +297,31 @@ public Integer read()throws UnexpectedInputException,ParseException,NonTransient 返回值,后面表示`writer`入参。 ```java - @Bean -public ItemWriter itemWriter(){ - return new ItemWriter(){ -@Override -public void write(List items){ - System.out.println("一次读取:"+Arrays.toString(items.toArray())); - } - }; - +@Bean +public ItemWriter itemWriter() { + return new ItemWriter() { + @Override + public void write(List items) { + System.out.println("一次读取:" + Arrays.toString(items.toArray())); } + }; +} ``` #### .build(); ```java - /** +/** * Build a step with the reader, writer, processor as provided. * * @see org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder#build() */ @Override -public TaskletStep build(){ - registerStepListenerAsItemListener(); - registerAsStreamsAndListeners(reader,processor,writer); - return super.build(); - } +public TaskletStep build() { + registerStepListenerAsItemListener(); + registerAsStreamsAndListeners(reader, processor, writer); + return super.build(); +} ``` `registerStepListenerAsItemListener()`:注册监听器,`ItemReadListener`,`ItemProcessListener`,`ItemWriteListener`. @@ -334,23 +332,23 @@ public TaskletStep build(){ `AbstractListenerFactoryBean.isListener(delegate, StepListener.class, StepListenerMetaData.values());`进行判断 ```java - BEFORE_STEP("beforeStep","before-step-method",BeforeStep.class,StepExecutionListener.class,StepExecution.class), - AFTER_STEP("afterStep","after-step-method",AfterStep.class,StepExecutionListener.class,StepExecution.class), - BEFORE_CHUNK("beforeChunk","before-chunk-method",BeforeChunk.class,ChunkListener.class,ChunkContext.class), - AFTER_CHUNK("afterChunk","after-chunk-method",AfterChunk.class,ChunkListener.class,ChunkContext.class), - AFTER_CHUNK_ERROR("afterChunkError","after-chunk-error-method",AfterChunkError.class,ChunkListener.class,ChunkContext.class), - BEFORE_READ("beforeRead","before-read-method",BeforeRead.class,ItemReadListener.class), - AFTER_READ("afterRead","after-read-method",AfterRead.class,ItemReadListener.class,Object.class), - ON_READ_ERROR("onReadError","on-read-error-method",OnReadError.class,ItemReadListener.class,Exception.class), - BEFORE_PROCESS("beforeProcess","before-process-method",BeforeProcess.class,ItemProcessListener.class,Object.class), - AFTER_PROCESS("afterProcess","after-process-method",AfterProcess.class,ItemProcessListener.class,Object.class,Object.class), - ON_PROCESS_ERROR("onProcessError","on-process-error-method",OnProcessError.class,ItemProcessListener.class,Object.class,Exception.class), - BEFORE_WRITE("beforeWrite","before-write-method",BeforeWrite.class,ItemWriteListener.class,List.class), - AFTER_WRITE("afterWrite","after-write-method",AfterWrite.class,ItemWriteListener.class,List.class), - ON_WRITE_ERROR("onWriteError","on-write-error-method",OnWriteError.class,ItemWriteListener.class,Exception.class,List.class), - ON_SKIP_IN_READ("onSkipInRead","on-skip-in-read-method",OnSkipInRead.class,SkipListener.class,Throwable.class), - ON_SKIP_IN_PROCESS("onSkipInProcess","on-skip-in-process-method",OnSkipInProcess.class,SkipListener.class,Object.class,Throwable.class), - ON_SKIP_IN_WRITE("onSkipInWrite","on-skip-in-write-method",OnSkipInWrite.class,SkipListener.class,Object.class,Throwable.class); +BEFORE_STEP("beforeStep","before-step-method",BeforeStep.class,StepExecutionListener.class,StepExecution.class), +AFTER_STEP("afterStep","after-step-method",AfterStep.class,StepExecutionListener.class,StepExecution.class), +BEFORE_CHUNK("beforeChunk","before-chunk-method",BeforeChunk.class,ChunkListener.class,ChunkContext.class), +AFTER_CHUNK("afterChunk","after-chunk-method",AfterChunk.class,ChunkListener.class,ChunkContext.class), +AFTER_CHUNK_ERROR("afterChunkError","after-chunk-error-method",AfterChunkError.class,ChunkListener.class,ChunkContext.class), +BEFORE_READ("beforeRead","before-read-method",BeforeRead.class,ItemReadListener.class), +AFTER_READ("afterRead","after-read-method",AfterRead.class,ItemReadListener.class,Object.class), +ON_READ_ERROR("onReadError","on-read-error-method",OnReadError.class,ItemReadListener.class,Exception.class), +BEFORE_PROCESS("beforeProcess","before-process-method",BeforeProcess.class,ItemProcessListener.class,Object.class), +AFTER_PROCESS("afterProcess","after-process-method",AfterProcess.class,ItemProcessListener.class,Object.class,Object.class), +ON_PROCESS_ERROR("onProcessError","on-process-error-method",OnProcessError.class,ItemProcessListener.class,Object.class,Exception.class), +BEFORE_WRITE("beforeWrite","before-write-method",BeforeWrite.class,ItemWriteListener.class,List.class), +AFTER_WRITE("afterWrite","after-write-method",AfterWrite.class,ItemWriteListener.class,List.class), +ON_WRITE_ERROR("onWriteError","on-write-error-method",OnWriteError.class,ItemWriteListener.class,Exception.class,List.class), +ON_SKIP_IN_READ("onSkipInRead","on-skip-in-read-method",OnSkipInRead.class,SkipListener.class,Throwable.class), +ON_SKIP_IN_PROCESS("onSkipInProcess","on-skip-in-process-method",OnSkipInProcess.class,SkipListener.class,Object.class,Throwable.class), +ON_SKIP_IN_WRITE("onSkipInWrite","on-skip-in-write-method",OnSkipInWrite.class,SkipListener.class,Object.class,Throwable.class); ``` 这些监听器之类的后面单独讲,所有这些之类的顶级接口`StepListener`。 @@ -358,49 +356,48 @@ public TaskletStep build(){ > super.build(); ```java - public TaskletStep build(){ +public TaskletStep build() { - registerStepListenerAsChunkListener(); + registerStepListenerAsChunkListener(); - TaskletStep step=new TaskletStep(getName()); + TaskletStep step = new TaskletStep(getName()); - super.enhance(step); + super.enhance(step); - step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0])); + step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0])); - if(transactionAttribute!=null){ + if (transactionAttribute != null) { step.setTransactionAttribute(transactionAttribute); - } + } - if(stepOperations==null){ + if (stepOperations == null) { - stepOperations=new RepeatTemplate(); + stepOperations = new RepeatTemplate(); - if(taskExecutor!=null){ - TaskExecutorRepeatTemplate repeatTemplate=new TaskExecutorRepeatTemplate(); - repeatTemplate.setTaskExecutor(taskExecutor); - repeatTemplate.setThrottleLimit(throttleLimit); - stepOperations=repeatTemplate; + if (taskExecutor != null) { + TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); + repeatTemplate.setTaskExecutor(taskExecutor); + repeatTemplate.setThrottleLimit(throttleLimit); + stepOperations = repeatTemplate; } - ((RepeatTemplate)stepOperations).setExceptionHandler(exceptionHandler); + ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler); - } - step.setStepOperations(stepOperations); - step.setTasklet(createTasklet()); + } + step.setStepOperations(stepOperations); + step.setTasklet(createTasklet()); - step.setStreams(streams.toArray(new ItemStream[0])); + step.setStreams(streams.toArray(new ItemStream[0])); - try{ + try { step.afterPropertiesSet(); - } - catch(Exception e){ + } catch (Exception e) { throw new StepBuilderException(e); - } + } - return step; + return step; - } +} ``` ###### super.enhance(step); @@ -429,57 +426,57 @@ taskExecutor:一般用来设置线程池线程池。这里我们也没有设 > step.setTasklet(createTasklet()); ```java - @Override -protected Tasklet createTasklet(){ - Assert.state(reader!=null,"ItemReader must be provided"); - Assert.state(writer!=null,"ItemWriter must be provided"); - RepeatOperations repeatOperations=createChunkOperations(); - SimpleChunkProvider chunkProvider=new SimpleChunkProvider<>(getReader(),repeatOperations); - SimpleChunkProcessor chunkProcessor=new SimpleChunkProcessor<>(getProcessor(),getWriter()); - chunkProvider.setListeners(new ArrayList<>(itemListeners)); - chunkProcessor.setListeners(new ArrayList<>(itemListeners)); - ChunkOrientedTasklet tasklet=new ChunkOrientedTasklet<>(chunkProvider,chunkProcessor); - tasklet.setBuffering(!readerTransactionalQueue); - return tasklet; - } +@Override +protected Tasklet createTasklet() { + Assert.state(reader != null, "ItemReader must be provided"); + Assert.state(writer != null, "ItemWriter must be provided"); + RepeatOperations repeatOperations = createChunkOperations(); + SimpleChunkProvider chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations); + SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter()); + chunkProvider.setListeners(new ArrayList<>(itemListeners)); + chunkProcessor.setListeners(new ArrayList<>(itemListeners)); + ChunkOrientedTasklet tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor); + tasklet.setBuffering(!readerTransactionalQueue); + return tasklet; +} ``` `createChunkOperations();`:重复处理类,这里也是`RepeatTemplate`。 ```java - protected RepeatOperations createChunkOperations(){ - RepeatOperations repeatOperations=chunkOperations; - if(repeatOperations==null){ - RepeatTemplate repeatTemplate=new RepeatTemplate(); +protected RepeatOperations createChunkOperations() { + RepeatOperations repeatOperations = chunkOperations; + if (repeatOperations == null) { + RepeatTemplate repeatTemplate = new RepeatTemplate(); repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy()); - repeatOperations=repeatTemplate; - } - return repeatOperations; - } + repeatOperations = repeatTemplate; + } + return repeatOperations; +} ``` 里面的`repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy());`。 ```java - /** +/** * @return a {@link CompletionPolicy} consistent with the chunk size and injected policy (if present). */ -protected CompletionPolicy getChunkCompletionPolicy(){ - Assert.state(!(completionPolicy!=null&&chunkSize>0), - "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); - Assert.state(chunkSize>=0,"The commitInterval must be positive or zero (for default value)."); +protected CompletionPolicy getChunkCompletionPolicy() { + Assert.state(!(completionPolicy != null && chunkSize > 0), + "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); + Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value)."); - if(completionPolicy!=null){ + if (completionPolicy != null) { return completionPolicy; + } + if (chunkSize == 0) { + if (logger.isInfoEnabled()) { + logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")"); } - if(chunkSize==0){ - if(logger.isInfoEnabled()){ - logger.info("Setting commit interval to default value ("+DEFAULT_COMMIT_INTERVAL+")"); - } - chunkSize=DEFAULT_COMMIT_INTERVAL; - } - return new SimpleCompletionPolicy(chunkSize); - } + chunkSize = DEFAULT_COMMIT_INTERVAL; + } + return new SimpleCompletionPolicy(chunkSize); +} ``` 每次处理大小:`在固定数量的操作后终止批处理的策略。维护内部状态并增加计数器,因此成功使用此策略需要每个批处理项仅调用一次isComplete()。使用标准的RepeatTemplate应确保保留此合同,但需要仔细监控。` @@ -499,17 +496,17 @@ protected CompletionPolicy getChunkCompletionPolicy(){ 使用`chunkProvider, chunkProcessor`构建出`ChunkOrientedTasklet`返回给`step`的`tasklet`。 ```java - /** +/** * Public setter for the {@link Tasklet}. * * @param tasklet the {@link Tasklet} to set */ -public void setTasklet(Tasklet tasklet){ - this.tasklet=tasklet; - if(tasklet instanceof StepExecutionListener){ - registerStepExecutionListener((StepExecutionListener)tasklet); - } - } +public void setTasklet(Tasklet tasklet) { + this.tasklet = tasklet; + if (tasklet instanceof StepExecutionListener) { + registerStepExecutionListener((StepExecutionListener) tasklet); + } +} ``` 这里默认构建出来的不是`StepExecutionListener`。是`ChunkOrientedTasklet`。这里不是主体流程,下面补充。 @@ -523,12 +520,12 @@ public void setTasklet(Tasklet tasklet){ 使用`JobBuilderFactory`创建出`Step`。`JobBuilderFactory`在上面`AbstractBatchConfiguration`创建了。 ```java - @Bean -public Job footballjob(){ - return jobBuilders.get("test") - .start(step()) - .build(); - } +@Bean +public Job footballjob() { + return jobBuilders.get("test") + .start(step()) + .build(); +} ``` #### jobBuilders.get("test") @@ -538,15 +535,15 @@ public Job footballjob(){ #### .start(step()) ```java - /** +/** * Create a new job builder that will execute a step or sequence of steps. * * @param step a step to execute * @return a {@link SimpleJobBuilder} */ -public SimpleJobBuilder start(Step step){ - return new SimpleJobBuilder(this).start(step); - } +public SimpleJobBuilder start(Step step) { + return new SimpleJobBuilder(this).start(step); +} ``` 创建一个`SimpleJobBuilder`,set 属性` private List steps = new ArrayList<>();`。 @@ -554,21 +551,20 @@ public SimpleJobBuilder start(Step step){ #### .build(); ```java - public Job build(){ - if(builder!=null){ +public Job build() { + if (builder != null) { return builder.end().build(); - } - SimpleJob job=new SimpleJob(getName()); - super.enhance(job); - job.setSteps(steps); - try{ + } + SimpleJob job = new SimpleJob(getName()); + super.enhance(job); + job.setSteps(steps); + try { job.afterPropertiesSet(); - } - catch(Exception e){ + } catch (Exception e) { throw new JobBuilderException(e); - } - return job; - } + } + return job; +} ``` 真正构建出来的是`SimpleJob`对象。 @@ -576,28 +572,28 @@ public SimpleJobBuilder start(Step step){ > super.enhance(job); ```java - protected void enhance(Job target){ - if(target instanceof AbstractJob){ - AbstractJob job=(AbstractJob)target; +protected void enhance(Job target) { + if (target instanceof AbstractJob) { + AbstractJob job = (AbstractJob) target; job.setJobRepository(properties.getJobRepository()); - JobParametersIncrementer jobParametersIncrementer=properties.getJobParametersIncrementer(); - if(jobParametersIncrementer!=null){ - job.setJobParametersIncrementer(jobParametersIncrementer); - } - JobParametersValidator jobParametersValidator=properties.getJobParametersValidator(); - if(jobParametersValidator!=null){ - job.setJobParametersValidator(jobParametersValidator); - } - Boolean restartable=properties.getRestartable(); - if(restartable!=null){ - job.setRestartable(restartable); + JobParametersIncrementer jobParametersIncrementer = properties.getJobParametersIncrementer(); + if (jobParametersIncrementer != null) { + job.setJobParametersIncrementer(jobParametersIncrementer); } - List listeners=properties.getJobExecutionListeners(); - if(!listeners.isEmpty()){ - job.setJobExecutionListeners(listeners.toArray(new JobExecutionListener[0])); + JobParametersValidator jobParametersValidator = properties.getJobParametersValidator(); + if (jobParametersValidator != null) { + job.setJobParametersValidator(jobParametersValidator); } + Boolean restartable = properties.getRestartable(); + if (restartable != null) { + job.setRestartable(restartable); } + List listeners = properties.getJobExecutionListeners(); + if (!listeners.isEmpty()) { + job.setJobExecutionListeners(listeners.toArray(new JobExecutionListener[0])); } + } +} ``` 把`CommonJobProperties properties`的属性赋值,比如`JobRepository`,`JobParametersIncrementer`,`JobParametersValidator` @@ -606,10 +602,10 @@ public SimpleJobBuilder start(Step step){ > job.setJobExecutionListeners(listeners.toArray(new JobExecutionListener[0])); ```java - List listeners=properties.getJobExecutionListeners(); - if(!listeners.isEmpty()){ - job.setJobExecutionListeners(listeners.toArray(new JobExecutionListener[0])); - } +List listeners = properties.getJobExecutionListeners(); +if (!listeners.isEmpty()) { + job.setJobExecutionListeners(listeners.toArray(new JobExecutionListener[0])); +} ``` 获取里面的`JobExecutionListener`set。 @@ -628,7 +624,6 @@ public SimpleJobBuilder start(Step step){ 也是使用这个`JobLauncherApplicationRunner#executeRegisteredJobs(JobParameters jobParameters)`。 ```java - @RestController public class BatchController { @@ -659,22 +654,22 @@ public class BatchController { #### `JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);` ```java - @Override +@Override @Nullable -public JobExecution getLastJobExecution(String jobName,JobParameters jobParameters){ - JobInstance jobInstance=jobInstanceDao.getJobInstance(jobName,jobParameters); - if(jobInstance==null){ +public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) { + JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters); + if (jobInstance == null) { return null; - } - JobExecution jobExecution=jobExecutionDao.getLastJobExecution(jobInstance); + } + JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance); - if(jobExecution!=null){ + if (jobExecution != null) { jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution)); stepExecutionDao.addStepExecutions(jobExecution); - } - return jobExecution; + } + return jobExecution; - } +} ``` `MapJobInstanceDao`已弃用 从 v4.3 开始,赞成使用 与 JdbcJobInstanceDao 内存数据库一起使用。计划在 v5.0 中删除。 @@ -692,50 +687,49 @@ public JobExecution getLastJobExecution(String jobName,JobParameters jobParamete `SimpleJobRepository#createJobExecution(String jobName, JobParameters jobParameters)` ```java - @Override -public JobExecution createJobExecution(String jobName,JobParameters jobParameters){ - JobInstance jobInstance=jobInstanceDao.getJobInstance(jobName,jobParameters); - ExecutionContext executionContext; - if(jobInstance!=null){ - List executions=jobExecutionDao.findJobExecutions(jobInstance); - if(executions.isEmpty()){ - throw new IllegalStateException("Cannot find any job execution for job instance: "+jobInstance); - } - for(JobExecution execution:executions){ - if(execution.isRunning()||execution.isStopping()){ - throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " - +jobInstance); - } - BatchStatus status=execution.getStatus(); - if(status==BatchStatus.UNKNOWN){ - throw new JobRestartException("Cannot restart job from UNKNOWN status. " - +"The last execution ended with a failure that could not be rolled back, " - +"so it may be dangerous to proceed. Manual intervention is probably necessary."); - } - Collection allJobParameters=execution.getJobParameters().getParameters().values(); - long identifyingJobParametersCount=allJobParameters.stream().filter(JobParameter::isIdentifying).count(); - if(identifyingJobParametersCount>0&&(status==BatchStatus.COMPLETED||status==BatchStatus.ABANDONED)){ - throw new JobInstanceAlreadyCompleteException( - "A job instance already exists and is complete for parameters="+jobParameters - +". If you want to run this job again, change the parameters."); - } - } - executionContext=ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance)); +@Override +public JobExecution createJobExecution(String jobName, JobParameters jobParameters) { + JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters); + ExecutionContext executionContext; + if (jobInstance != null) { + List executions = jobExecutionDao.findJobExecutions(jobInstance); + if (executions.isEmpty()) { + throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance); + } + for (JobExecution execution : executions) { + if (execution.isRunning() || execution.isStopping()) { + throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + + jobInstance); + } + BatchStatus status = execution.getStatus(); + if (status == BatchStatus.UNKNOWN) { + throw new JobRestartException("Cannot restart job from UNKNOWN status. " + + "The last execution ended with a failure that could not be rolled back, " + + "so it may be dangerous to proceed. Manual intervention is probably necessary."); + } + Collection allJobParameters = execution.getJobParameters().getParameters().values(); + long identifyingJobParametersCount = allJobParameters.stream().filter(JobParameter::isIdentifying).count(); + if (identifyingJobParametersCount > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) { + throw new JobInstanceAlreadyCompleteException( + "A job instance already exists and is complete for parameters=" + jobParameters + + ". If you want to run this job again, change the parameters."); + } } - else{ + executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance)); + } else { // no job found, create one - jobInstance=jobInstanceDao.createJobInstance(jobName,jobParameters); - executionContext=new ExecutionContext(); - } - JobExecution jobExecution=new JobExecution(jobInstance,jobParameters,null); - jobExecution.setExecutionContext(executionContext); - jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); - // Save the JobExecution so that it picks up an ID (useful for clients - // monitoring asynchronous executions): - jobExecutionDao.saveJobExecution(jobExecution); - ecDao.saveExecutionContext(jobExecution); - return jobExecution; - } + jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters); + executionContext = new ExecutionContext(); + } + JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null); + jobExecution.setExecutionContext(executionContext); + jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); + // Save the JobExecution so that it picks up an ID (useful for clients + // monitoring asynchronous executions): + jobExecutionDao.saveJobExecution(jobExecution); + ecDao.saveExecutionContext(jobExecution); + return jobExecution; +} ``` 首先`JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);`。这里的`jobInstanceDao`是 @@ -745,14 +739,14 @@ Mapxxx,注意这几个类都是过时的,我们先把流程弄懂在看这 ##### jobInstanceDao#createJobInstance(String jobName, JobParameters jobParameters) ```java - @Override -public JobInstance createJobInstance(String jobName,JobParameters jobParameters){ - Assert.state(getJobInstance(jobName,jobParameters)==null,"JobInstance must not already exist"); - JobInstance jobInstance=new JobInstance(currentId.getAndIncrement(),jobName); - jobInstance.incrementVersion(); - jobInstances.put(jobName+"|"+jobKeyGenerator.generateKey(jobParameters),jobInstance); - return jobInstance; - } +@Override +public JobInstance createJobInstance(String jobName, JobParameters jobParameters) { + Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist"); + JobInstance jobInstance = new JobInstance(currentId.getAndIncrement(), jobName); + jobInstance.incrementVersion(); + jobInstances.put(jobName + "|" + jobKeyGenerator.generateKey(jobParameters), jobInstance); + return jobInstance; +} ``` 就是`Map jobInstances = new ConcurrentHashMap<>();`put @@ -775,11 +769,11 @@ public JobInstance createJobInstance(String jobName,JobParameters jobParameters) ##### jobExecutionDao.saveJobExecution(jobExecution); ```java - Assert.isTrue(jobExecution.getId()==null,"jobExecution id is not null"); - Long newId=currentId.getAndIncrement(); - jobExecution.setId(newId); - jobExecution.incrementVersion(); - executionsById.put(newId,copy(jobExecution)); +Assert.isTrue(jobExecution.getId() == null, "jobExecution id is not null"); +Long newId = currentId.getAndIncrement(); +jobExecution.setId(newId); +jobExecution.incrementVersion(); +executionsById.put(newId, copy(jobExecution)); ``` 来到`MapJobExecutionDao`类,这 4 个`Dao`后面单独讲解。现在都是使用 MapxxxDao。 @@ -792,18 +786,18 @@ public JobInstance createJobInstance(String jobName,JobParameters jobParameters) ##### ecDao.saveExecutionContext(jobExecution); ```java - @Override -public void saveExecutionContext(JobExecution jobExecution){ - updateExecutionContext(jobExecution); - } +@Override +public void saveExecutionContext(JobExecution jobExecution) { + updateExecutionContext(jobExecution); +} @Override -public void updateExecutionContext(JobExecution jobExecution){ - ExecutionContext executionContext=jobExecution.getExecutionContext(); - if(executionContext!=null){ - contexts.put(ContextKey.job(jobExecution.getId()),copy(executionContext)); - } - } +public void updateExecutionContext(JobExecution jobExecution) { + ExecutionContext executionContext = jobExecution.getExecutionContext(); + if (executionContext != null) { + contexts.put(ContextKey.job(jobExecution.getId()), copy(executionContext)); + } +} ``` 值放入`ConcurrentMap contexts = TransactionAwareProxyFactory.createAppendOnlyTransactionalMap()` @@ -812,35 +806,33 @@ public void updateExecutionContext(JobExecution jobExecution){ #### taskExecutor.execute(new Runnable() ) ```java - try{ - taskExecutor.execute(new Runnable(){ -@Override -public void run(){ - try{ - job.execute(jobExecution); - } - catch(Throwable t){ - rethrow(t); - } - } -private void rethrow(Throwable t){ - if(t instanceof RuntimeException){ - throw(RuntimeException)t; - } - else if(t instanceof Error){ - throw(Error)t; - } - throw new IllegalStateException(t); +try { + taskExecutor.execute(new Runnable() { + @Override + public void run() { + try { + job.execute(jobExecution); + } catch (Throwable t) { + rethrow(t); + } } - }); + + private void rethrow(Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } + throw new IllegalStateException(t); } - catch(TaskRejectedException e){ - jobExecution.upgradeStatus(BatchStatus.FAILED); - if(jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)){ + }); +} catch (TaskRejectedException e) { + jobExecution.upgradeStatus(BatchStatus.FAILED); + if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); - } - jobRepository.update(jobExecution); - } + } + jobRepository.update(jobExecution); +} ``` `taskExecutor`是在`afterPropertiesSet()`方法创建的,为`SyncTaskExecutor` @@ -855,104 +847,104 @@ private void rethrow(Throwable t){ job 是`SimpleJob`类。进入`AbstractJob#execute(JobExecution execution)`方法。 ```java - @Override -public final void execute(JobExecution execution){ - Assert.notNull(execution,"jobExecution must not be null"); - if(logger.isDebugEnabled()){ - logger.debug("Job execution starting: "+execution); - } - JobSynchronizationManager.register(execution); - LongTaskTimer longTaskTimer=BatchMetrics.createLongTaskTimer("job.active","Active jobs", - Tag.of("name",execution.getJobInstance().getJobName())); - LongTaskTimer.Sample longTaskTimerSample=longTaskTimer.start(); - Timer.Sample timerSample=BatchMetrics.createTimerSample(); - try{ +@Override +public final void execute(JobExecution execution) { + Assert.notNull(execution, "jobExecution must not be null"); + if (logger.isDebugEnabled()) { + logger.debug("Job execution starting: " + execution); + } + JobSynchronizationManager.register(execution); + LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer("job.active", "Active jobs", + Tag.of("name", execution.getJobInstance().getJobName())); + LongTaskTimer.Sample longTaskTimerSample = longTaskTimer.start(); + Timer.Sample timerSample = BatchMetrics.createTimerSample(); + try { jobParametersValidator.validate(execution.getJobParameters()); - if(execution.getStatus()!=BatchStatus.STOPPING){ - execution.setStartTime(new Date()); - updateStatus(execution,BatchStatus.STARTED); - listener.beforeJob(execution); - try{ - doExecute(execution); - if(logger.isDebugEnabled()){ - logger.debug("Job execution complete: "+execution); - } - }catch(RepeatException e){ - throw e.getCause(); - } - }else{ - // The job was already stopped before we even got this far. Deal - // with it in the same way as any other interruption. - execution.setStatus(BatchStatus.STOPPED); - execution.setExitStatus(ExitStatus.COMPLETED); - if(logger.isDebugEnabled()){ - logger.debug("Job execution was stopped: "+execution); - } + if (execution.getStatus() != BatchStatus.STOPPING) { + execution.setStartTime(new Date()); + updateStatus(execution, BatchStatus.STARTED); + listener.beforeJob(execution); + try { + doExecute(execution); + if (logger.isDebugEnabled()) { + logger.debug("Job execution complete: " + execution); + } + } catch (RepeatException e) { + throw e.getCause(); + } + } else { + // The job was already stopped before we even got this far. Deal + // with it in the same way as any other interruption. + execution.setStatus(BatchStatus.STOPPED); + execution.setExitStatus(ExitStatus.COMPLETED); + if (logger.isDebugEnabled()) { + logger.debug("Job execution was stopped: " + execution); + } } - }catch(JobInterruptedException e){ - if(logger.isInfoEnabled()){ - logger.info("Encountered interruption executing job: " - +e.getMessage()); + } catch (JobInterruptedException e) { + if (logger.isInfoEnabled()) { + logger.info("Encountered interruption executing job: " + + e.getMessage()); } - if(logger.isDebugEnabled()){ - logger.debug("Full exception",e); + if (logger.isDebugEnabled()) { + logger.debug("Full exception", e); } - execution.setExitStatus(getDefaultExitStatusForFailure(e,execution)); - execution.setStatus(BatchStatus.max(BatchStatus.STOPPED,e.getStatus())); + execution.setExitStatus(getDefaultExitStatusForFailure(e, execution)); + execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus())); execution.addFailureException(e); - }catch(Throwable t){ - logger.error("Encountered fatal error executing job",t); - execution.setExitStatus(getDefaultExitStatusForFailure(t,execution)); + } catch (Throwable t) { + logger.error("Encountered fatal error executing job", t); + execution.setExitStatus(getDefaultExitStatusForFailure(t, execution)); execution.setStatus(BatchStatus.FAILED); execution.addFailureException(t); - }finally{ - try{ - if(execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED) - &&execution.getStepExecutions().isEmpty()){ - ExitStatus exitStatus=execution.getExitStatus(); - ExitStatus newExitStatus= - ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job."); - execution.setExitStatus(exitStatus.and(newExitStatus)); - } - timerSample.stop(BatchMetrics.createTimer("job","Job duration", - Tag.of("name",execution.getJobInstance().getJobName()), - Tag.of("status",execution.getExitStatus().getExitCode()) - )); - longTaskTimerSample.stop(); - execution.setEndTime(new Date()); - try{ - listener.afterJob(execution); - }catch(Exception e){ - logger.error("Exception encountered in afterJob callback",e); - } - jobRepository.update(execution); - }finally{ - JobSynchronizationManager.release(); - } - } + } finally { + try { + if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED) + && execution.getStepExecutions().isEmpty()) { + ExitStatus exitStatus = execution.getExitStatus(); + ExitStatus newExitStatus = + ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job."); + execution.setExitStatus(exitStatus.and(newExitStatus)); + } + timerSample.stop(BatchMetrics.createTimer("job", "Job duration", + Tag.of("name", execution.getJobInstance().getJobName()), + Tag.of("status", execution.getExitStatus().getExitCode()) + )); + longTaskTimerSample.stop(); + execution.setEndTime(new Date()); + try { + listener.afterJob(execution); + } catch (Exception e) { + logger.error("Exception encountered in afterJob callback", e); + } + jobRepository.update(execution); + } finally { + JobSynchronizationManager.release(); } + } +} ``` ##### JobSynchronizationManager.register(execution); ```java - @Nullable -public C register(@Nullable E execution){ - if(execution==null){ +@Nullable +public C register(@Nullable E execution) { + if (execution == null) { return null; + } + getCurrent().push(execution); + C context; + synchronized (contexts) { + context = contexts.get(execution); + if (context == null) { + context = createNewContext(execution, null); + contexts.put(execution, context); } - getCurrent().push(execution); - C context; -synchronized (contexts){ - context=contexts.get(execution); - if(context==null){ - context=createNewContext(execution,null); - contexts.put(execution,context); - } - } - increment(); - return context; - } + } + increment(); + return context; +} ``` `SynchronizationManagerSupport`的泛型为`JobExecution, JobContext` @@ -978,20 +970,20 @@ private final Map contexts=new ConcurrentHashMap<>(); > `increment();` ```java - public void increment(){ - E current=getCurrent().peek(); - if(current!=null){ +public void increment() { + E current = getCurrent().peek(); + if (current != null) { AtomicInteger count; -synchronized (counts){ - count=counts.get(current); - if(count==null){ - count=new AtomicInteger(); - counts.put(current,count); - } + synchronized (counts) { + count = counts.get(current); + if (count == null) { + count = new AtomicInteger(); + counts.put(current, count); + } } count.incrementAndGet(); - } - } + } +} ``` 简单来说就是里面三个属性不存在就创建。存在把`executionHolder`更新。`count`++。 @@ -1041,10 +1033,10 @@ private final Map contexts=new ConcurrentHashMap<>(); ##### updateStatus(execution, BatchStatus.STARTED); ```java - private void updateStatus(JobExecution jobExecution,BatchStatus status){ - jobExecution.setStatus(status); - jobRepository.update(jobExecution); - } +private void updateStatus(JobExecution jobExecution, BatchStatus status) { + jobExecution.setStatus(status); + jobRepository.update(jobExecution); +} ``` > jobExecution.setStatus(status); @@ -1058,24 +1050,24 @@ private final Map contexts=new ConcurrentHashMap<>(); `SimpleJobRepository#update(JobExecution jobExecution)` ```java - @Override -public void update(JobExecution jobExecution){ +@Override +public void update(JobExecution jobExecution) { - Assert.notNull(jobExecution,"JobExecution cannot be null."); - Assert.notNull(jobExecution.getJobId(),"JobExecution must have a Job ID set."); - Assert.notNull(jobExecution.getId(),"JobExecution must be already saved (have an id assigned)."); + Assert.notNull(jobExecution, "JobExecution cannot be null."); + Assert.notNull(jobExecution.getJobId(), "JobExecution must have a Job ID set."); + Assert.notNull(jobExecution.getId(), "JobExecution must be already saved (have an id assigned)."); - jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); + jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); - jobExecutionDao.synchronizeStatus(jobExecution); - if(jobExecution.getStatus()==BatchStatus.STOPPING&&jobExecution.getEndTime()!=null){ - if(logger.isInfoEnabled()){ - logger.info("Upgrading job execution status from STOPPING to STOPPED since it has already ended."); + jobExecutionDao.synchronizeStatus(jobExecution); + if (jobExecution.getStatus() == BatchStatus.STOPPING && jobExecution.getEndTime() != null) { + if (logger.isInfoEnabled()) { + logger.info("Upgrading job execution status from STOPPING to STOPPED since it has already ended."); } jobExecution.upgradeStatus(BatchStatus.STOPPED); - } - jobExecutionDao.updateJobExecution(jobExecution); - } + } + jobExecutionDao.updateJobExecution(jobExecution); +} ``` 先判断不能为空,设置最后修改时间。 @@ -1085,14 +1077,14 @@ public void update(JobExecution jobExecution){ `MapJobExecutionDao#synchronizeStatus(JobExecution jobExecution)` ```java - @Override -public void synchronizeStatus(JobExecution jobExecution){ - JobExecution saved=getJobExecution(jobExecution.getId()); - if(saved.getVersion().intValue()!=jobExecution.getVersion().intValue()){ +@Override +public void synchronizeStatus(JobExecution jobExecution) { + JobExecution saved = getJobExecution(jobExecution.getId()); + if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) { jobExecution.upgradeStatus(saved.getStatus()); jobExecution.setVersion(saved.getVersion()); - } - } + } +} ``` 版本不一样就更新,然后版本++,这里版本没有改变。回到`SimpleJobRepository#update(JobExecution jobExecution)` @@ -1103,23 +1095,23 @@ public void synchronizeStatus(JobExecution jobExecution){ `MapJobExecutionDao#updateJobExecution(JobExecution jobExecution)` ```java - @Override -public void updateJobExecution(JobExecution jobExecution){ - Long id=jobExecution.getId(); - Assert.notNull(id,"JobExecution is expected to have an id (should be saved already)"); - JobExecution persistedExecution=executionsById.get(id); - Assert.notNull(persistedExecution,"JobExecution must already be saved"); - -synchronized (jobExecution){ - if(!persistedExecution.getVersion().equals(jobExecution.getVersion())){ - throw new OptimisticLockingFailureException("Attempt to update job execution id="+id - +" with wrong version ("+jobExecution.getVersion()+"), where current version is " - +persistedExecution.getVersion()); +@Override +public void updateJobExecution(JobExecution jobExecution) { + Long id = jobExecution.getId(); + Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)"); + JobExecution persistedExecution = executionsById.get(id); + Assert.notNull(persistedExecution, "JobExecution must already be saved"); + + synchronized (jobExecution) { + if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) { + throw new OptimisticLockingFailureException("Attempt to update job execution id=" + id + + " with wrong version (" + jobExecution.getVersion() + "), where current version is " + + persistedExecution.getVersion()); } jobExecution.incrementVersion(); - executionsById.put(id,copy(jobExecution)); - } - } + executionsById.put(id, copy(jobExecution)); + } +} ``` 首先校验不能为空,版本不一致就抛出异常,这里基本都是使用乐观锁。官网有介绍。接着更新版本,保存数据。 @@ -1140,27 +1132,27 @@ synchronized (jobExecution){ `JobBuilderHelper#listener(Object listener)` ```java - /** +/** * Registers objects using the annotation based listener configuration. * * @param listener the object that has a method configured with listener annotation * @return this for fluent chaining */ -public B listener(Object listener){ - Set jobExecutionListenerMethods=new HashSet<>(); - jobExecutionListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(),BeforeJob.class)); - jobExecutionListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(),AfterJob.class)); +public B listener(Object listener) { + Set jobExecutionListenerMethods = new HashSet<>(); + jobExecutionListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeJob.class)); + jobExecutionListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterJob.class)); - if(jobExecutionListenerMethods.size()>0){ - JobListenerFactoryBean factory=new JobListenerFactoryBean(); + if (jobExecutionListenerMethods.size() > 0) { + JobListenerFactoryBean factory = new JobListenerFactoryBean(); factory.setDelegate(listener); - properties.addJobExecutionListener((JobExecutionListener)factory.getObject()); - } + properties.addJobExecutionListener((JobExecutionListener) factory.getObject()); + } -@SuppressWarnings("unchecked") - B result=(B)this; - return result; - } + @SuppressWarnings("unchecked") + B result = (B) this; + return result; +} ``` 参数是一个 object,里面只要有`BeforeJob.class`或者`AfterJob.class`注解也是`JobExecutionListener` @@ -1171,28 +1163,28 @@ public B listener(Object listener){ ##### doExecute(execution); ```java - @Override -protected void doExecute(JobExecution execution)throws JobInterruptedException,JobRestartException, - StartLimitExceededException{ - StepExecution stepExecution=null; - for(Step step:steps){ - stepExecution=handleStep(step,execution); - if(stepExecution.getStatus()!=BatchStatus.COMPLETED){ - break; - } +@Override +protected void doExecute(JobExecution execution) throws JobInterruptedException, JobRestartException, + StartLimitExceededException { + StepExecution stepExecution = null; + for (Step step : steps) { + stepExecution = handleStep(step, execution); + if (stepExecution.getStatus() != BatchStatus.COMPLETED) { + break; } + } - // - // Update the job status to be the same as the last step - // - if(stepExecution!=null){ - if(logger.isDebugEnabled()){ - logger.debug("Upgrading JobExecution status: "+stepExecution); + // + // Update the job status to be the same as the last step + // + if (stepExecution != null) { + if (logger.isDebugEnabled()) { + logger.debug("Upgrading JobExecution status: " + stepExecution); } execution.upgradeStatus(stepExecution.getStatus()); execution.setExitStatus(stepExecution.getExitStatus()); - } - } + } +} ``` 循环所有的`steps`。 @@ -1200,21 +1192,21 @@ protected void doExecute(JobExecution execution)throws JobInterruptedException,J > stepExecution = handleStep(step, execution); ```java - protected final StepExecution handleStep(Step step,JobExecution execution) - throws JobInterruptedException,JobRestartException, - StartLimitExceededException{ - return stepHandler.handleStep(step,execution); +protected final StepExecution handleStep(Step step, JobExecution execution) + throws JobInterruptedException, JobRestartException, + StartLimitExceededException { + return stepHandler.handleStep(step, execution); - } +} ``` 这里的`stepHandler`对象,是`AbstractJob#setJobRepository(JobRepository jobRepository)`的时候创建的。代码如下 ```java - public void setJobRepository(JobRepository jobRepository){ - this.jobRepository=jobRepository; - stepHandler=new SimpleStepHandler(jobRepository); - } +public void setJobRepository(JobRepository jobRepository) { + this.jobRepository = jobRepository; + stepHandler = new SimpleStepHandler(jobRepository); +} ``` ###### SimpleStepHandler#handleStep(Step step, JobExecution execution) @@ -1222,50 +1214,48 @@ protected void doExecute(JobExecution execution)throws JobInterruptedException,J 接着到了`SimpleStepHandler#handleStep(Step step, JobExecution execution)` ```java - @Override -public StepExecution handleStep(Step step,JobExecution execution)throws JobInterruptedException, - JobRestartException,StartLimitExceededException{ - if(execution.isStopping()){ +@Override +public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException, + JobRestartException, StartLimitExceededException { + if (execution.isStopping()) { throw new JobInterruptedException("JobExecution interrupted."); - } - JobInstance jobInstance=execution.getJobInstance(); - StepExecution lastStepExecution=jobRepository.getLastStepExecution(jobInstance,step.getName()); - if(stepExecutionPartOfExistingJobExecution(execution,lastStepExecution)){ - lastStepExecution=null; - } - StepExecution currentStepExecution=lastStepExecution; - if(shouldStart(lastStepExecution,execution,step)){ - currentStepExecution=execution.createStepExecution(step.getName()); - boolean isRestart=(lastStepExecution!=null&&!lastStepExecution.getStatus().equals( - BatchStatus.COMPLETED)); - if(isRestart){ - currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); - if(lastStepExecution.getExecutionContext().containsKey("batch.executed")){ - currentStepExecution.getExecutionContext().remove("batch.executed"); - } - } - else{ - currentStepExecution.setExecutionContext(new ExecutionContext(executionContext)); + } + JobInstance jobInstance = execution.getJobInstance(); + StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName()); + if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) { + lastStepExecution = null; + } + StepExecution currentStepExecution = lastStepExecution; + if (shouldStart(lastStepExecution, execution, step)) { + currentStepExecution = execution.createStepExecution(step.getName()); + boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals( + BatchStatus.COMPLETED)); + if (isRestart) { + currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); + if (lastStepExecution.getExecutionContext().containsKey("batch.executed")) { + currentStepExecution.getExecutionContext().remove("batch.executed"); + } + } else { + currentStepExecution.setExecutionContext(new ExecutionContext(executionContext)); } jobRepository.add(currentStepExecution); - try{ - step.execute(currentStepExecution); - currentStepExecution.getExecutionContext().put("batch.executed",true); - } - catch(JobInterruptedException e){ - execution.setStatus(BatchStatus.STOPPING); - throw e; + try { + step.execute(currentStepExecution); + currentStepExecution.getExecutionContext().put("batch.executed", true); + } catch (JobInterruptedException e) { + execution.setStatus(BatchStatus.STOPPING); + throw e; } jobRepository.updateExecutionContext(execution); - if(currentStepExecution.getStatus()==BatchStatus.STOPPING - ||currentStepExecution.getStatus()==BatchStatus.STOPPED){ - // Ensure that the job gets the message that it is stopping - execution.setStatus(BatchStatus.STOPPING); - throw new JobInterruptedException("Job interrupted by step execution"); - } - } - return currentStepExecution; + if (currentStepExecution.getStatus() == BatchStatus.STOPPING + || currentStepExecution.getStatus() == BatchStatus.STOPPED) { + // Ensure that the job gets the message that it is stopping + execution.setStatus(BatchStatus.STOPPING); + throw new JobInterruptedException("Job interrupted by step execution"); } + } + return currentStepExecution; +} ``` 首先获取`JobInstance`对象,是`MapJobInstanceDao`对象创建的。 @@ -1275,46 +1265,46 @@ public StepExecution handleStep(Step step,JobExecution execution)throws JobInter `SimpleJobRepository#getLastStepExecution(JobInstance jobInstance, String stepName)` ```java - @Override +@Override @Nullable -public StepExecution getLastStepExecution(JobInstance jobInstance,String stepName){ - StepExecution latest=stepExecutionDao.getLastStepExecution(jobInstance,stepName); +public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { + StepExecution latest = stepExecutionDao.getLastStepExecution(jobInstance, stepName); - if(latest!=null){ - ExecutionContext stepExecutionContext=ecDao.getExecutionContext(latest); + if (latest != null) { + ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest); latest.setExecutionContext(stepExecutionContext); - ExecutionContext jobExecutionContext=ecDao.getExecutionContext(latest.getJobExecution()); + ExecutionContext jobExecutionContext = ecDao.getExecutionContext(latest.getJobExecution()); latest.getJobExecution().setExecutionContext(jobExecutionContext); - } + } - return latest; - } + return latest; +} ``` 进入`MapStepExecutionDao#getLastStepExecution(JobInstance jobInstance, String stepName)` ```java - @Override -public StepExecution getLastStepExecution(JobInstance jobInstance,String stepName){ - StepExecution latest=null; - for(StepExecution stepExecution:executionsByStepExecutionId.values()){ - if(!stepExecution.getStepName().equals(stepName) - ||stepExecution.getJobExecution().getJobInstance().getInstanceId()!=jobInstance.getInstanceId()){ - continue; - } - if(latest==null){ - latest=stepExecution; - } - if(latest.getStartTime().getTime() `SimpleStepHandler#shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step)` ```java -//给定一个步骤和配置,如果该步骤应该开始,则返回 true;如果不应该启动,则返回 false;如果作业应该完成,则抛出异常。 -protected boolean shouldStart(StepExecution lastStepExecution,JobExecution jobExecution,Step step) - throws JobRestartException,StartLimitExceededException{ +// 给定一个步骤和配置,如果该步骤应该开始,则返回 true;如果不应该启动,则返回 false;如果作业应该完成,则抛出异常。 +protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step) + throws JobRestartException, StartLimitExceededException { - BatchStatus stepStatus; - if(lastStepExecution==null){ - stepStatus=BatchStatus.STARTING; - } - else{ - stepStatus=lastStepExecution.getStatus(); - } + BatchStatus stepStatus; + if (lastStepExecution == null) { + stepStatus = BatchStatus.STARTING; + } else { + stepStatus = lastStepExecution.getStatus(); + } - if(stepStatus==BatchStatus.UNKNOWN){ + if (stepStatus == BatchStatus.UNKNOWN) { throw new JobRestartException("Cannot restart step from UNKNOWN status. " - +"The last execution ended with a failure that could not be rolled back, " - +"so it may be dangerous to proceed. Manual intervention is probably necessary."); - } + + "The last execution ended with a failure that could not be rolled back, " + + "so it may be dangerous to proceed. Manual intervention is probably necessary."); + } - if((stepStatus==BatchStatus.COMPLETED&&!step.isAllowStartIfComplete()) - ||stepStatus==BatchStatus.ABANDONED){ + if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete()) + || stepStatus == BatchStatus.ABANDONED) { // step is complete, false should be returned, indicating that the // step should not be started - if(logger.isInfoEnabled()){ - logger.info("Step already complete or not restartable, so no action to execute: "+lastStepExecution); + if (logger.isInfoEnabled()) { + logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution); } return false; - } + } - if(jobRepository.getStepExecutionCount(jobExecution.getJobInstance(),step.getName())chunk(2) - .reader(itemReader()) - .writer(itemWriter()) - .startLimit(2) - .build(); - taskletStep.setAllowStartIfComplete(true); - return taskletStep; - } +@Bean +public Step step() { + TaskletStep taskletStep = stepBuilders.get("test-step") + .chunk(2) + .reader(itemReader()) + .writer(itemWriter()) + .startLimit(2) + .build(); + taskletStep.setAllowStartIfComplete(true); + return taskletStep; +} ``` 改一下,设置成重复执行。`taskletStep.setAllowStartIfComplete(true);` @@ -1396,11 +1384,11 @@ public Step step(){ > currentStepExecution = execution.createStepExecution(step.getName()); ```java - public StepExecution createStepExecution(String stepName){ - StepExecution stepExecution=new StepExecution(stepName,this); - this.stepExecutions.add(stepExecution); - return stepExecution; - } +public StepExecution createStepExecution(String stepName) { + StepExecution stepExecution = new StepExecution(stepName, this); + this.stepExecutions.add(stepExecution); + return stepExecution; +} ``` 创建一个`StepExecution`并返回。`stepExecutions.add` @@ -1418,14 +1406,14 @@ public Step step(){ `SimpleJobRepository#add(StepExecution stepExecution)` ```java - @Override -public void add(StepExecution stepExecution){ - validateStepExecution(stepExecution); +@Override +public void add(StepExecution stepExecution) { + validateStepExecution(stepExecution); - stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); - stepExecutionDao.saveStepExecution(stepExecution); - ecDao.saveExecutionContext(stepExecution); - } + stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); + stepExecutionDao.saveStepExecution(stepExecution); + ecDao.saveExecutionContext(stepExecution); +} ``` `validateStepExecution(stepExecution)`:校验 @@ -1434,23 +1422,23 @@ public void add(StepExecution stepExecution){ > stepExecutionDao.saveStepExecution(stepExecution); ```java - @Override -public void saveStepExecution(StepExecution stepExecution){ - Assert.isTrue(stepExecution.getId()==null,"stepExecution id was not null"); - Assert.isTrue(stepExecution.getVersion()==null,"stepExecution version was not null"); - Assert.notNull(stepExecution.getJobExecutionId(),"JobExecution must be saved already."); - Map executions=executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); - if(executions==null){ - executions=new ConcurrentHashMap<>(); - executionsByJobExecutionId.put(stepExecution.getJobExecutionId(),executions); - } - stepExecution.setId(currentId.incrementAndGet()); - stepExecution.incrementVersion(); - StepExecution copy=copy(stepExecution); - executions.put(stepExecution.getId(),copy); - executionsByStepExecutionId.put(stepExecution.getId(),copy); +@Override +public void saveStepExecution(StepExecution stepExecution) { + Assert.isTrue(stepExecution.getId() == null, "stepExecution id was not null"); + Assert.isTrue(stepExecution.getVersion() == null, "stepExecution version was not null"); + Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already."); + Map executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); + if (executions == null) { + executions = new ConcurrentHashMap<>(); + executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions); + } + stepExecution.setId(currentId.incrementAndGet()); + stepExecution.incrementVersion(); + StepExecution copy = copy(stepExecution); + executions.put(stepExecution.getId(), copy); + executionsByStepExecutionId.put(stepExecution.getId(), copy); - } +} ``` 先校验,再新增一条数据。 `StepExecution`里面包含了`JobExecution`。 `JobExecutionDao`维护`jobExecution`的 Id。`StepExecution` @@ -1459,17 +1447,18 @@ public void saveStepExecution(StepExecution stepExecution){ > ecDao.saveExecutionContext(stepExecution); ```java - @Override -public void saveExecutionContext(StepExecution stepExecution){ - updateExecutionContext(stepExecution); - } @Override -public void updateExecutionContext(StepExecution stepExecution){ - ExecutionContext executionContext=stepExecution.getExecutionContext(); - if(executionContext!=null){ - contexts.put(ContextKey.step(stepExecution.getId()),copy(executionContext)); - } - } +public void saveExecutionContext(StepExecution stepExecution) { + updateExecutionContext(stepExecution); +} + +@Override +public void updateExecutionContext(StepExecution stepExecution) { + ExecutionContext executionContext = stepExecution.getExecutionContext(); + if (executionContext != null) { + contexts.put(ContextKey.step(stepExecution.getId()), copy(executionContext)); + } +} ``` 新增一条数据。前面已经新增了一条`job`。现在新增的是`step`。 @@ -1480,128 +1469,120 @@ public void updateExecutionContext(StepExecution stepExecution){ `AbstractStep#execute(StepExecution stepExecution)` ```java - /** +/** * Template method for step execution logic - calls abstract methods for resource initialization ( * {@link #open(ExecutionContext)}), execution logic ({@link #doExecute(StepExecution)}) and resource closing ( * {@link #close(ExecutionContext)}). */ @Override -public final void execute(StepExecution stepExecution)throws JobInterruptedException, - UnexpectedJobExecutionException{ +public final void execute(StepExecution stepExecution) throws JobInterruptedException, + UnexpectedJobExecutionException { - Assert.notNull(stepExecution,"stepExecution must not be null"); + Assert.notNull(stepExecution, "stepExecution must not be null"); - if(logger.isDebugEnabled()){ - logger.debug("Executing: id="+stepExecution.getId()); - } - stepExecution.setStartTime(new Date()); - stepExecution.setStatus(BatchStatus.STARTED); - Timer.Sample sample=BatchMetrics.createTimerSample(); - getJobRepository().update(stepExecution); + if (logger.isDebugEnabled()) { + logger.debug("Executing: id=" + stepExecution.getId()); + } + stepExecution.setStartTime(new Date()); + stepExecution.setStatus(BatchStatus.STARTED); + Timer.Sample sample = BatchMetrics.createTimerSample(); + getJobRepository().update(stepExecution); - // Start with a default value that will be trumped by anything - ExitStatus exitStatus=ExitStatus.EXECUTING; + // Start with a default value that will be trumped by anything + ExitStatus exitStatus = ExitStatus.EXECUTING; - doExecutionRegistration(stepExecution); + doExecutionRegistration(stepExecution); - try{ + try { getCompositeListener().beforeStep(stepExecution); open(stepExecution.getExecutionContext()); - try{ - doExecute(stepExecution); - } - catch(RepeatException e){ - throw e.getCause(); + try { + doExecute(stepExecution); + } catch (RepeatException e) { + throw e.getCause(); } - exitStatus=ExitStatus.COMPLETED.and(stepExecution.getExitStatus()); + exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus()); // Check if someone is trying to stop us - if(stepExecution.isTerminateOnly()){ - throw new JobInterruptedException("JobExecution interrupted."); + if (stepExecution.isTerminateOnly()) { + throw new JobInterruptedException("JobExecution interrupted."); } // Need to upgrade here not set, in case the execution was stopped stepExecution.upgradeStatus(BatchStatus.COMPLETED); - if(logger.isDebugEnabled()){ - logger.debug("Step execution success: id="+stepExecution.getId()); - } + if (logger.isDebugEnabled()) { + logger.debug("Step execution success: id=" + stepExecution.getId()); } - catch(Throwable e){ + } catch (Throwable e) { stepExecution.upgradeStatus(determineBatchStatus(e)); - exitStatus=exitStatus.and(getDefaultExitStatusForFailure(e)); + exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); stepExecution.addFailureException(e); - if(stepExecution.getStatus()==BatchStatus.STOPPED){ - logger.info(String.format("Encountered interruption executing step %s in job %s : %s",name,stepExecution.getJobExecution().getJobInstance().getJobName(),e.getMessage())); - if(logger.isDebugEnabled()){ - logger.debug("Full exception",e); - } - } - else{ - logger.error(String.format("Encountered an error executing step %s in job %s",name,stepExecution.getJobExecution().getJobInstance().getJobName()),e); - } - } - finally{ - - try{ - // Update the step execution to the latest known value so the - // listeners can act on it - exitStatus=exitStatus.and(stepExecution.getExitStatus()); - stepExecution.setExitStatus(exitStatus); - exitStatus=exitStatus.and(getCompositeListener().afterStep(stepExecution)); - } - catch(Exception e){ - logger.error(String.format("Exception in afterStep callback in step %s in job %s",name,stepExecution.getJobExecution().getJobInstance().getJobName()),e); - } - - try{ - getJobRepository().updateExecutionContext(stepExecution); - } - catch(Exception e){ - stepExecution.setStatus(BatchStatus.UNKNOWN); - exitStatus=exitStatus.and(ExitStatus.UNKNOWN); - stepExecution.addFailureException(e); - logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " - +"This job is now in an unknown state and should not be restarted.",name,stepExecution.getJobExecution().getJobInstance().getJobName()),e); - } - - sample.stop(BatchMetrics.createTimer("step","Step duration", - Tag.of("job.name",stepExecution.getJobExecution().getJobInstance().getJobName()), - Tag.of("name",stepExecution.getStepName()), - Tag.of("status",stepExecution.getExitStatus().getExitCode()) + if (stepExecution.getStatus() == BatchStatus.STOPPED) { + logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage())); + if (logger.isDebugEnabled()) { + logger.debug("Full exception", e); + } + } else { + logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); + } + } finally { + + try { + // Update the step execution to the latest known value so the + // listeners can act on it + exitStatus = exitStatus.and(stepExecution.getExitStatus()); + stepExecution.setExitStatus(exitStatus); + exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution)); + } catch (Exception e) { + logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); + } + + try { + getJobRepository().updateExecutionContext(stepExecution); + } catch (Exception e) { + stepExecution.setStatus(BatchStatus.UNKNOWN); + exitStatus = exitStatus.and(ExitStatus.UNKNOWN); + stepExecution.addFailureException(e); + logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " + + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); + } + + sample.stop(BatchMetrics.createTimer("step", "Step duration", + Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), + Tag.of("name", stepExecution.getStepName()), + Tag.of("status", stepExecution.getExitStatus().getExitCode()) )); stepExecution.setEndTime(new Date()); stepExecution.setExitStatus(exitStatus); - Duration stepExecutionDuration=BatchMetrics.calculateDuration(stepExecution.getStartTime(),stepExecution.getEndTime()); - if(logger.isInfoEnabled()){ - logger.info("Step: ["+stepExecution.getStepName()+"] executed in "+BatchMetrics.formatDuration(stepExecutionDuration)); + Duration stepExecutionDuration = BatchMetrics.calculateDuration(stepExecution.getStartTime(), stepExecution.getEndTime()); + if (logger.isInfoEnabled()) { + logger.info("Step: [" + stepExecution.getStepName() + "] executed in " + BatchMetrics.formatDuration(stepExecutionDuration)); } - try{ - getJobRepository().update(stepExecution); - } - catch(Exception e){ - stepExecution.setStatus(BatchStatus.UNKNOWN); - stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN)); - stepExecution.addFailureException(e); - logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " - +"This job is now in an unknown state and should not be restarted.",name,stepExecution.getJobExecution().getJobInstance().getJobName()),e); + try { + getJobRepository().update(stepExecution); + } catch (Exception e) { + stepExecution.setStatus(BatchStatus.UNKNOWN); + stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN)); + stepExecution.addFailureException(e); + logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " + + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); } - try{ - close(stepExecution.getExecutionContext()); - } - catch(Exception e){ - logger.error(String.format("Exception while closing step execution resources in step %s in job %s",name,stepExecution.getJobExecution().getJobInstance().getJobName()),e); - stepExecution.addFailureException(e); + try { + close(stepExecution.getExecutionContext()); + } catch (Exception e) { + logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); + stepExecution.addFailureException(e); } doExecutionRelease(); - if(logger.isDebugEnabled()){ - logger.debug("Step execution complete: "+stepExecution.getSummary()); - } - } + if (logger.isDebugEnabled()) { + logger.debug("Step execution complete: " + stepExecution.getSummary()); } + } +} ``` 前面几行校验,打印日志,计时器。 @@ -1609,15 +1590,15 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep > getJobRepository().update(stepExecution); ```java - @Override -public void update(StepExecution stepExecution){ - validateStepExecution(stepExecution); - Assert.notNull(stepExecution.getId(),"StepExecution must already be saved (have an id assigned)"); +@Override +public void update(StepExecution stepExecution) { + validateStepExecution(stepExecution); + Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)"); - stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); - stepExecutionDao.updateStepExecution(stepExecution); - checkForInterruption(stepExecution); - } + stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); + stepExecutionDao.updateStepExecution(stepExecution); + checkForInterruption(stepExecution); +} ``` 校验参数,设置最后修改时间。 @@ -1639,7 +1620,7 @@ key 为 Step 的 id 回到`SimpleJobRepository#update(StepExecution stepExecution)`。 ```java - /** +/** * Check to determine whether or not the JobExecution that is the parent of * the provided StepExecution has been interrupted. If, after synchronizing * the status with the database, the status has been updated to STOPPING, @@ -1647,14 +1628,14 @@ key 为 Step 的 id * * @param stepExecution */ -private void checkForInterruption(StepExecution stepExecution){ - JobExecution jobExecution=stepExecution.getJobExecution(); - jobExecutionDao.synchronizeStatus(jobExecution); - if(jobExecution.isStopping()){ +private void checkForInterruption(StepExecution stepExecution) { + JobExecution jobExecution = stepExecution.getJobExecution(); + jobExecutionDao.synchronizeStatus(jobExecution); + if (jobExecution.isStopping()) { logger.info("Parent JobExecution is stopped, so passing message on to StepExecution"); stepExecution.setTerminateOnly(); - } - } + } +} ``` 检查以确定作为所提供的 StepExecution 的父级 JobExecution 是否已被中断。如果与数据库同步状态后,状态已更新为 @@ -1678,10 +1659,10 @@ STOPPING,则作业已中断。 > open(stepExecution.getExecutionContext()); ```java - @Override -protected void open(ExecutionContext ctx)throws Exception{ - stream.open(ctx); - } +@Override +protected void open(ExecutionContext ctx) throws Exception { + stream.open(ctx); +} ``` 里面的`stream`存在一个`private final List streams = new ArrayList<>();`属性,也就是`ItemStream`对象。里面的 @@ -1691,33 +1672,32 @@ open 方法会在这里执行。 ```java @Override -protected void doExecute(StepExecution stepExecution)throws Exception{ - stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY,tasklet.getClass().getName()); - stepExecution.getExecutionContext().put(STEP_TYPE_KEY,this.getClass().getName()); - stream.update(stepExecution.getExecutionContext()); - getJobRepository().updateExecutionContext(stepExecution); -final Semaphore semaphore=createSemaphore(); - stepOperations.iterate(new StepContextRepeatCallback(stepExecution){ -@Override -public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext chunkContext) - throws Exception{ - StepExecution stepExecution=chunkContext.getStepContext().getStepExecution(); - interruptionPolicy.checkInterrupted(stepExecution); - - RepeatStatus result; - try{ - result=new TransactionTemplate(transactionManager,transactionAttribute) - .execute(new ChunkTransactionCallback(chunkContext,semaphore)); - } - catch(UncheckedTransactionException e){ - throw(Exception)e.getCause(); - } - chunkListener.afterChunk(chunkContext); - interruptionPolicy.checkInterrupted(stepExecution); - return result==null?RepeatStatus.FINISHED:result; - } - }); +protected void doExecute(StepExecution stepExecution) throws Exception { + stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName()); + stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); + stream.update(stepExecution.getExecutionContext()); + getJobRepository().updateExecutionContext(stepExecution); + final Semaphore semaphore = createSemaphore(); + stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { + @Override + public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) + throws Exception { + StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); + interruptionPolicy.checkInterrupted(stepExecution); + + RepeatStatus result; + try { + result = new TransactionTemplate(transactionManager, transactionAttribute) + .execute(new ChunkTransactionCallback(chunkContext, semaphore)); + } catch (UncheckedTransactionException e) { + throw (Exception) e.getCause(); + } + chunkListener.afterChunk(chunkContext); + interruptionPolicy.checkInterrupted(stepExecution); + return result == null ? RepeatStatus.FINISHED : result; } + }); +} ``` 首先在`stepExecution#ExecutionContext`set 了`batch.taskletType`和`batch.stepType` @@ -1738,7 +1718,7 @@ public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext ch `RepeatTemplate#iterate(RepeatCallback callback)` ```java - /** +/** * Execute the batch callback until the completion policy decides that we * are finished. Wait for the whole batch to finish before returning even if * the task executor is asynchronous. @@ -1746,151 +1726,145 @@ public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext ch * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback) */ @Override -public RepeatStatus iterate(RepeatCallback callback){ +public RepeatStatus iterate(RepeatCallback callback) { - RepeatContext outer=RepeatSynchronizationManager.getContext(); + RepeatContext outer = RepeatSynchronizationManager.getContext(); - RepeatStatus result=RepeatStatus.CONTINUABLE; - try{ + RepeatStatus result = RepeatStatus.CONTINUABLE; + try { // This works with an asynchronous TaskExecutor: the // interceptors have to wait for the child processes. - result=executeInternal(callback); - } - finally{ + result = executeInternal(callback); + } finally { RepeatSynchronizationManager.clear(); - if(outer!=null){ - RepeatSynchronizationManager.register(outer); - } + if (outer != null) { + RepeatSynchronizationManager.register(outer); } + } - return result; - } + return result; +} ``` 第一次进来`outer`为空,进入`result = executeInternal(callback);`方法。 ```java - private RepeatStatus executeInternal(final RepeatCallback callback){ +private RepeatStatus executeInternal(final RepeatCallback callback) { - // Reset the termination policy if there is one... - RepeatContext context=start(); + // Reset the termination policy if there is one... + RepeatContext context = start(); - // Make sure if we are already marked complete before we start then no - // processing takes place. - boolean running=!isMarkedComplete(context); + // Make sure if we are already marked complete before we start then no + // processing takes place. + boolean running = !isMarkedComplete(context); - for(RepeatListener interceptor:listeners){ + for (RepeatListener interceptor : listeners) { interceptor.open(context); - running=running&&!isMarkedComplete(context); - if(!running) - break; - } - // Return value, default is to allow continued processing. - RepeatStatus result=RepeatStatus.CONTINUABLE; - - RepeatInternalState state=createInternalState(context); - // This is the list of exceptions thrown by all active callbacks - Collectionthrowables=state.getThrowables(); - // Keep a separate list of exceptions we handled that need to be - // rethrown - Collectiondeferred=new ArrayList<>(); - - try{ - - while(running){ - - /* - * Run the before interceptors here, not in the task executor so - * that they all happen in the same thread - it's easier for - * tracking batch status, amongst other things. - */ - for(int i=0;i throwables = state.getThrowables(); + // Keep a separate list of exceptions we handled that need to be + // rethrown + Collection deferred = new ArrayList<>(); + + try { + + while (running) { + + /* + * Run the before interceptors here, not in the task executor so + * that they all happen in the same thread - it's easier for + * tracking batch status, amongst other things. + */ + for (int i = 0; i < listeners.length; i++) { + RepeatListener interceptor = listeners[i]; + interceptor.before(context); + // Allow before interceptors to veto the batch by setting + // flag. + running = running && !isMarkedComplete(context); + } - // Check that we are still running (should always be true) ... - if(running){ + // Check that we are still running (should always be true) ... + if (running) { - try{ + try { - result=getNextResult(context,callback,state); - executeAfterInterceptors(context,result); + result = getNextResult(context, callback, state); + executeAfterInterceptors(context, result); - } - catch(Throwable throwable){ - doHandle(throwable,context,deferred); - } + } catch (Throwable throwable) { + doHandle(throwable, context, deferred); + } - // N.B. the order may be important here: - if(isComplete(context,result)||isMarkedComplete(context)||!deferred.isEmpty()){ - running=false; - } + // N.B. the order may be important here: + if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) { + running = false; + } - } + } } - result=result.and(waitForResults(state)); - for(Throwable throwable:throwables){ - doHandle(throwable,context,deferred); + result = result.and(waitForResults(state)); + for (Throwable throwable : throwables) { + doHandle(throwable, context, deferred); } // Explicitly drop any references to internal state... - state=null; - } - /* - * No need for explicit catch here - if the business processing threw an - * exception it was already handled by the helper methods. An exception - * here is necessarily fatal. - */ - finally{ - try{ - if(!deferred.isEmpty()){ - Throwable throwable=deferred.iterator().next(); - if(logger.isDebugEnabled()){ - logger.debug("Handling fatal exception explicitly (rethrowing first of "+deferred.size()+"): " - +throwable.getClass().getName()+": "+throwable.getMessage()); - } - rethrow(throwable); - } - } - finally{ - try{ - for(int i=listeners.length;i-->0;){ - RepeatListener interceptor=listeners[i]; - interceptor.close(context); - } - } - finally{ - context.close(); - } - } - } - return result; + state = null; + } + /* + * No need for explicit catch here - if the business processing threw an + * exception it was already handled by the helper methods. An exception + * here is necessarily fatal. + */ finally { + try { + if (!deferred.isEmpty()) { + Throwable throwable = deferred.iterator().next(); + if (logger.isDebugEnabled()) { + logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): " + + throwable.getClass().getName() + ": " + throwable.getMessage()); + } + rethrow(throwable); + } + } finally { + try { + for (int i = listeners.length; i-- > 0; ) { + RepeatListener interceptor = listeners[i]; + interceptor.close(context); + } + } finally { + context.close(); + } } + } + return result; +} ``` > RepeatContext context = start(); ```java - /** +/** * Delegate to the {@link CompletionPolicy}. * * @return a {@link RepeatContext} object that can be used by the implementation to store * internal state for a batch step. - * * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext) */ -protected RepeatContext start(){ - RepeatContext parent=RepeatSynchronizationManager.getContext(); - RepeatContext context=completionPolicy.start(parent); - RepeatSynchronizationManager.register(context); - logger.debug("Starting repeat context."); - return context; - } +protected RepeatContext start() { + RepeatContext parent = RepeatSynchronizationManager.getContext(); + RepeatContext context = completionPolicy.start(parent); + RepeatSynchronizationManager.register(context); + logger.debug("Starting repeat context."); + return context; +} ``` 往`RepeatSynchronizationManager#ThreadLocal contextHolder = new ThreadLocal<>()`中新增一个`RepeatContext` @@ -1901,17 +1875,17 @@ protected RepeatContext start(){ 确保我们在开始之前是否已标记为完成,则不会进行任何处理。 ```java - private boolean isMarkedComplete(RepeatContext context){ - boolean complete=context.isCompleteOnly(); - if(context.getParent()!=null){ - complete=complete||isMarkedComplete(context.getParent()); - } - if(complete){ +private boolean isMarkedComplete(RepeatContext context) { + boolean complete = context.isCompleteOnly(); + if (context.getParent() != null) { + complete = complete || isMarkedComplete(context.getParent()); + } + if (complete) { logger.debug("Repeat is complete according to context alone."); - } - return complete; + } + return complete; - } +} ``` `complete`默认为 false,取反`running`为 true。 @@ -1927,9 +1901,9 @@ protected RepeatContext start(){ > RepeatInternalState state = createInternalState(context); ```java - protected RepeatInternalState createInternalState(RepeatContext context){ - return new RepeatInternalStateSupport(); - } +protected RepeatInternalState createInternalState(RepeatContext context) { + return new RepeatInternalStateSupport(); +} ``` > Collection throwables = state.getThrowables(); @@ -1947,15 +1921,15 @@ protected RepeatContext start(){ > result = getNextResult(context, callback, state); ```java - protected RepeatStatus getNextResult(RepeatContext context,RepeatCallback callback,RepeatInternalState state) - throws Throwable{ - update(context); - if(logger.isDebugEnabled()){ - logger.debug("Repeat operation about to start at count="+context.getStartedCount()); - } - return callback.doInIteration(context); +protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) + throws Throwable { + update(context); + if (logger.isDebugEnabled()) { + logger.debug("Repeat operation about to start at count=" + context.getStartedCount()); + } + return callback.doInIteration(context); - } +} ``` 解释一下三个参数,`context`,前面方法`start()`中 new 出来的`RepeatContextSupport`,当前里面什么都没有。 @@ -1971,36 +1945,35 @@ protected RepeatContext start(){ `StepContextRepeatCallback#doInIteration(RepeatContext context)` ```java - @Override -public RepeatStatus doInIteration(RepeatContext context)throws Exception{ +@Override +public RepeatStatus doInIteration(RepeatContext context) throws Exception { - // The StepContext has to be the same for all chunks, - // otherwise step-scoped beans will be re-initialised for each chunk. - StepContext stepContext=StepSynchronizationManager.register(stepExecution); - if(logger.isDebugEnabled()){ - logger.debug("Preparing chunk execution for StepContext: "+ObjectUtils.identityToString(stepContext)); - } + // The StepContext has to be the same for all chunks, + // otherwise step-scoped beans will be re-initialised for each chunk. + StepContext stepContext = StepSynchronizationManager.register(stepExecution); + if (logger.isDebugEnabled()) { + logger.debug("Preparing chunk execution for StepContext: " + ObjectUtils.identityToString(stepContext)); + } - ChunkContext chunkContext=attributeQueue.poll(); - if(chunkContext==null){ - chunkContext=new ChunkContext(stepContext); - } + ChunkContext chunkContext = attributeQueue.poll(); + if (chunkContext == null) { + chunkContext = new ChunkContext(stepContext); + } - try{ - if(logger.isDebugEnabled()){ - logger.debug("Chunk execution starting: queue size="+attributeQueue.size()); - } - return doInChunkContext(context,chunkContext); + try { + if (logger.isDebugEnabled()) { + logger.debug("Chunk execution starting: queue size=" + attributeQueue.size()); } - finally{ + return doInChunkContext(context, chunkContext); + } finally { // Still some stuff to do with the data in this chunk, // pass it back. - if(!chunkContext.isComplete()){ - attributeQueue.add(chunkContext); + if (!chunkContext.isComplete()) { + attributeQueue.add(chunkContext); } StepSynchronizationManager.close(); - } - } + } +} ``` > StepContext stepContext = StepSynchronizationManager.register(stepExecution); @@ -2012,35 +1985,34 @@ public RepeatStatus doInIteration(RepeatContext context)throws Exception{ > doInChunkContext(context, chunkContext); ```java - @Override -public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext chunkContext) - throws Exception{ +@Override +public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) + throws Exception { - StepExecution stepExecution=chunkContext.getStepContext().getStepExecution(); + StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); - // Before starting a new transaction, check for - // interruption. - interruptionPolicy.checkInterrupted(stepExecution); + // Before starting a new transaction, check for + // interruption. + interruptionPolicy.checkInterrupted(stepExecution); - RepeatStatus result; - try{ - result=new TransactionTemplate(transactionManager,transactionAttribute) - .execute(new ChunkTransactionCallback(chunkContext,semaphore)); - } - catch(UncheckedTransactionException e){ + RepeatStatus result; + try { + result = new TransactionTemplate(transactionManager, transactionAttribute) + .execute(new ChunkTransactionCallback(chunkContext, semaphore)); + } catch (UncheckedTransactionException e) { // Allow checked exceptions to be thrown inside callback - throw(Exception)e.getCause(); - } + throw (Exception) e.getCause(); + } - chunkListener.afterChunk(chunkContext); + chunkListener.afterChunk(chunkContext); - // Check for interruption after transaction as well, so that - // the interrupted exception is correctly propagated up to - // caller - interruptionPolicy.checkInterrupted(stepExecution); + // Check for interruption after transaction as well, so that + // the interrupted exception is correctly propagated up to + // caller + interruptionPolicy.checkInterrupted(stepExecution); - return result==null?RepeatStatus.FINISHED:result; - } + return result == null ? RepeatStatus.FINISHED : result; +} ``` 执行` doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)` @@ -2067,34 +2039,31 @@ public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext ch 执行 execute 方法。使用`ChunkTransactionCallback`包装`chunkContext, semaphore`俩个属性。 ```java - @Override +@Override @Nullable -public T execute(TransactionCallback action)throws TransactionException{ - Assert.state(this.transactionManager!=null,"No PlatformTransactionManager set"); +public T execute(TransactionCallback action) throws TransactionException { + Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); - if(this.transactionManager instanceof CallbackPreferringPlatformTransactionManager){ - return((CallbackPreferringPlatformTransactionManager)this.transactionManager).execute(this,action); - } - else{ - TransactionStatus status=this.transactionManager.getTransaction(this); + if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) { + return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action); + } else { + TransactionStatus status = this.transactionManager.getTransaction(this); T result; - try{ - result=action.doInTransaction(status); - } - catch(RuntimeException|Error ex){ - // Transactional code threw application exception -> rollback - rollbackOnException(status,ex); - throw ex; - } - catch(Throwable ex){ - // Transactional code threw unexpected exception -> rollback - rollbackOnException(status,ex); - throw new UndeclaredThrowableException(ex,"TransactionCallback threw undeclared checked exception"); + try { + result = action.doInTransaction(status); + } catch (RuntimeException | Error ex) { + // Transactional code threw application exception -> rollback + rollbackOnException(status, ex); + throw ex; + } catch (Throwable ex) { + // Transactional code threw unexpected exception -> rollback + rollbackOnException(status, ex); + throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); } this.transactionManager.commit(status); return result; - } - } + } +} ``` 进入 else,里面大概逻辑就是成功就提交,失败就回滚。主要方法在`result = action.doInTransaction(status);`中。 @@ -2104,109 +2073,102 @@ public T execute(TransactionCallback action)throws TransactionException{ `TransactionCallback action`对象为`ChunkTransactionCallback` ```java - @Override -public RepeatStatus doInTransaction(TransactionStatus status){ - TransactionSynchronizationManager.registerSynchronization(this); +@Override +public RepeatStatus doInTransaction(TransactionStatus status) { + TransactionSynchronizationManager.registerSynchronization(this); - RepeatStatus result=RepeatStatus.CONTINUABLE; + RepeatStatus result = RepeatStatus.CONTINUABLE; - StepContribution contribution=stepExecution.createStepContribution(); + StepContribution contribution = stepExecution.createStepContribution(); - chunkListener.beforeChunk(chunkContext); + chunkListener.beforeChunk(chunkContext); - // In case we need to push it back to its old value - // after a commit fails... - oldVersion=new StepExecution(stepExecution.getStepName(),stepExecution.getJobExecution()); - copy(stepExecution,oldVersion); - try{ - try{ - try{ - result=tasklet.execute(contribution,chunkContext); - if(result==null){ - result=RepeatStatus.FINISHED; - } - } - catch(Exception e){ - if(transactionAttribute.rollbackOn(e)){ - chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY,e); - throw e; - } - } - } - finally{ - - // If the step operations are asynchronous then we need - // to synchronize changes to the step execution (at a - // minimum). Take the lock *before* changing the step - // execution. - try{ - semaphore.acquire(); - locked=true; - } - catch(InterruptedException e){ - logger.error("Thread interrupted while locking for repository update"); - stepExecution.setStatus(BatchStatus.STOPPED); - stepExecution.setTerminateOnly(); - Thread.currentThread().interrupt(); - } + // In case we need to push it back to its old value + // after a commit fails... + oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); + copy(stepExecution, oldVersion); + try { + try { + try { + result = tasklet.execute(contribution, chunkContext); + if (result == null) { + result = RepeatStatus.FINISHED; + } + } catch (Exception e) { + if (transactionAttribute.rollbackOn(e)) { + chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e); + throw e; + } + } + } finally { + + // If the step operations are asynchronous then we need + // to synchronize changes to the step execution (at a + // minimum). Take the lock *before* changing the step + // execution. + try { + semaphore.acquire(); + locked = true; + } catch (InterruptedException e) { + logger.error("Thread interrupted while locking for repository update"); + stepExecution.setStatus(BatchStatus.STOPPED); + stepExecution.setTerminateOnly(); + Thread.currentThread().interrupt(); + } - // Apply the contribution to the step - // even if unsuccessful - if(logger.isDebugEnabled()){ - logger.debug("Applying contribution: "+contribution); - } - stepExecution.apply(contribution); + // Apply the contribution to the step + // even if unsuccessful + if (logger.isDebugEnabled()) { + logger.debug("Applying contribution: " + contribution); + } + stepExecution.apply(contribution); } - stepExecutionUpdated=true; + stepExecutionUpdated = true; stream.update(stepExecution.getExecutionContext()); - try{ - // Going to attempt a commit. If it fails this flag will - // stay false and we can use that later. - getJobRepository().updateExecutionContext(stepExecution); - stepExecution.incrementCommitCount(); - if(logger.isDebugEnabled()){ - logger.debug("Saving step execution before commit: "+stepExecution); - } - getJobRepository().update(stepExecution); - } - catch(Exception e){ - // If we get to here there was a problem saving the step - // execution and we have to fail. - String msg="JobRepository failure forcing rollback"; - logger.error(msg,e); - throw new FatalStepExecutionException(msg,e); - } - } - catch(Error e){ - if(logger.isDebugEnabled()){ - logger.debug("Rollback for Error: "+e.getClass().getName()+": "+e.getMessage()); + try { + // Going to attempt a commit. If it fails this flag will + // stay false and we can use that later. + getJobRepository().updateExecutionContext(stepExecution); + stepExecution.incrementCommitCount(); + if (logger.isDebugEnabled()) { + logger.debug("Saving step execution before commit: " + stepExecution); + } + getJobRepository().update(stepExecution); + } catch (Exception e) { + // If we get to here there was a problem saving the step + // execution and we have to fail. + String msg = "JobRepository failure forcing rollback"; + logger.error(msg, e); + throw new FatalStepExecutionException(msg, e); + } + } catch (Error e) { + if (logger.isDebugEnabled()) { + logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage()); } rollback(stepExecution); throw e; - } - catch(RuntimeException e){ - if(logger.isDebugEnabled()){ - logger.debug("Rollback for RuntimeException: "+e.getClass().getName()+": "+e.getMessage()); + } catch (RuntimeException e) { + if (logger.isDebugEnabled()) { + logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage()); } rollback(stepExecution); throw e; - } - catch(Exception e){ - if(logger.isDebugEnabled()){ - logger.debug("Rollback for Exception: "+e.getClass().getName()+": "+e.getMessage()); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage()); } rollback(stepExecution); // Allow checked exceptions throw new UncheckedTransactionException(e); - } + } - return result; + return result; - } +} ``` > TransactionSynchronizationManager.registerSynchronization(this); @@ -2235,30 +2197,30 @@ public RepeatStatus doInTransaction(TransactionStatus status){ `tasklet`是`SimpleStepBuilder#createTasklet()`里面创建的。也就是`ChunkOrientedTasklet`。 ```java - public RepeatStatus execute(StepContribution contribution,ChunkContext chunkContext)throws Exception{ -@SuppressWarnings("unchecked") - Chunk inputs=(Chunk)chunkContext.getAttribute(INPUTS_KEY); - if(inputs==null){ - inputs=chunkProvider.provide(contribution); - if(buffering){ - chunkContext.setAttribute(INPUTS_KEY,inputs); - } +public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { + @SuppressWarnings("unchecked") + Chunk inputs = (Chunk) chunkContext.getAttribute(INPUTS_KEY); + if (inputs == null) { + inputs = chunkProvider.provide(contribution); + if (buffering) { + chunkContext.setAttribute(INPUTS_KEY, inputs); } - chunkProcessor.process(contribution,inputs); - chunkProvider.postProcess(contribution,inputs); - // Allow a message coming back from the processor to say that we - // are not done yet - if(inputs.isBusy()){ + } + chunkProcessor.process(contribution, inputs); + chunkProvider.postProcess(contribution, inputs); + // Allow a message coming back from the processor to say that we + // are not done yet + if (inputs.isBusy()) { logger.debug("Inputs still busy"); return RepeatStatus.CONTINUABLE; - } - chunkContext.removeAttribute(INPUTS_KEY); - chunkContext.setComplete(); - if(logger.isDebugEnabled()){ - logger.debug("Inputs not busy, ended: "+inputs.isEnd()); - } - return RepeatStatus.continueIf(!inputs.isEnd()); - } + } + chunkContext.removeAttribute(INPUTS_KEY); + chunkContext.setComplete(); + if (logger.isDebugEnabled()) { + logger.debug("Inputs not busy, ended: " + inputs.isEnd()); + } + return RepeatStatus.continueIf(!inputs.isEnd()); +} ``` 这里又是使用`repeatOperations#.iterate(new RepeatCallback()`。这里有个点需要注意一下,在 builder @@ -2267,41 +2229,39 @@ public RepeatStatus doInTransaction(TransactionStatus status){ `chunkOperations`在这里使用的,父类的在`TaskletStep#doExecute(StepExecution stepExecution)`中使用的。 ```java - @Override -public Chunk provide(final StepContribution contribution)throws Exception{ -final Chunk inputs=new Chunk<>(); - repeatOperations.iterate(new RepeatCallback(){ @Override -public RepeatStatus doInIteration(final RepeatContext context)throws Exception{ - I item=null; - Timer.Sample sample=Timer.start(Metrics.globalRegistry); - String status=BatchMetrics.STATUS_SUCCESS; - try{ - item=read(contribution,inputs); - } - catch(SkipOverflowException e){ - // read() tells us about an excess of skips by throwing an - // exception - status=BatchMetrics.STATUS_FAILURE; - return RepeatStatus.FINISHED; - } - finally{ - stopTimer(sample,contribution.getStepExecution(),status); - } - if(item==null){ - inputs.setEnd(); - return RepeatStatus.FINISHED; - } - inputs.add(item); - contribution.incrementReadCount(); - return RepeatStatus.CONTINUABLE; +public Chunk provide(final StepContribution contribution) throws Exception { + final Chunk inputs = new Chunk<>(); + repeatOperations.iterate(new RepeatCallback() { + @Override + public RepeatStatus doInIteration(final RepeatContext context) throws Exception { + I item = null; + Timer.Sample sample = Timer.start(Metrics.globalRegistry); + String status = BatchMetrics.STATUS_SUCCESS; + try { + item = read(contribution, inputs); + } catch (SkipOverflowException e) { + // read() tells us about an excess of skips by throwing an + // exception + status = BatchMetrics.STATUS_FAILURE; + return RepeatStatus.FINISHED; + } finally { + stopTimer(sample, contribution.getStepExecution(), status); + } + if (item == null) { + inputs.setEnd(); + return RepeatStatus.FINISHED; + } + inputs.add(item); + contribution.incrementReadCount(); + return RepeatStatus.CONTINUABLE; } - }); + }); - return inputs; + return inputs; - } +} ``` 这里的`repeatOperations`也是`RepeatTemplate`。逻辑与前面的都一致,只不过`RepeatCallback` @@ -2316,43 +2276,41 @@ public RepeatStatus doInIteration(final RepeatContext context)throws Exception{ 进入方法。 ```java - @Override -public Chunk provide(final StepContribution contribution)throws Exception{ +@Override +public Chunk provide(final StepContribution contribution) throws Exception { -final Chunk inputs=new Chunk<>(); - repeatOperations.iterate(new RepeatCallback(){ + final Chunk inputs = new Chunk<>(); + repeatOperations.iterate(new RepeatCallback() { -@Override -public RepeatStatus doInIteration(final RepeatContext context)throws Exception{ - I item=null; - Timer.Sample sample=Timer.start(Metrics.globalRegistry); - String status=BatchMetrics.STATUS_SUCCESS; - try{ - item=read(contribution,inputs); - } - catch(SkipOverflowException e){ - // read() tells us about an excess of skips by throwing an - // exception - status=BatchMetrics.STATUS_FAILURE; - return RepeatStatus.FINISHED; - } - finally{ - stopTimer(sample,contribution.getStepExecution(),status); - } - if(item==null){ - inputs.setEnd(); - return RepeatStatus.FINISHED; - } - inputs.add(item); - contribution.incrementReadCount(); - return RepeatStatus.CONTINUABLE; + @Override + public RepeatStatus doInIteration(final RepeatContext context) throws Exception { + I item = null; + Timer.Sample sample = Timer.start(Metrics.globalRegistry); + String status = BatchMetrics.STATUS_SUCCESS; + try { + item = read(contribution, inputs); + } catch (SkipOverflowException e) { + // read() tells us about an excess of skips by throwing an + // exception + status = BatchMetrics.STATUS_FAILURE; + return RepeatStatus.FINISHED; + } finally { + stopTimer(sample, contribution.getStepExecution(), status); + } + if (item == null) { + inputs.setEnd(); + return RepeatStatus.FINISHED; + } + inputs.add(item); + contribution.incrementReadCount(); + return RepeatStatus.CONTINUABLE; } - }); + }); - return inputs; + return inputs; - } +} ``` 只关注一些关键代码,整体逻辑就是执行`item = read(contribution, inputs);`,出错返回` RepeatStatus.FINISHED` @@ -2368,27 +2326,27 @@ public RepeatStatus doInIteration(final RepeatContext context)throws Exception{ ```java /** * Surrounds the read call with listener callbacks. + * * @return the item or {@code null} if the data source is exhausted * @throws Exception is thrown if error occurs during read. */ @Nullable -protected final I doRead()throws Exception{ - try{ +protected final I doRead() throws Exception { + try { listener.beforeRead(); - I item=itemReader.read(); - if(item!=null){ - listener.afterRead(item); + I item = itemReader.read(); + if (item != null) { + listener.afterRead(item); } return item; - } - catch(Exception e){ - if(logger.isDebugEnabled()){ - logger.debug(e.getMessage()+" : "+e.getClass().getName()); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(e.getMessage() + " : " + e.getClass().getName()); } listener.onReadError(e); throw e; - } - } + } +} ``` listener.beforeRead(); `ItemReadListener#beforeRead()`方法。 @@ -2411,29 +2369,29 @@ listener.beforeRead(); `ItemReadListener#beforeRead()`方法。 是`SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());` ```java - @Override -public final void process(StepContribution contribution,Chunk inputs)throws Exception{ +@Override +public final void process(StepContribution contribution, Chunk inputs) throws Exception { - // Allow temporary state to be stored in the user data field - initializeUserData(inputs); + // Allow temporary state to be stored in the user data field + initializeUserData(inputs); - // If there is no input we don't have to do anything more - if(isComplete(inputs)){ + // If there is no input we don't have to do anything more + if (isComplete(inputs)) { return; - } + } - // Make the transformation, calling remove() on the inputs iterator if - // any items are filtered. Might throw exception and cause rollback. - Chunk outputs=transform(contribution,inputs); + // Make the transformation, calling remove() on the inputs iterator if + // any items are filtered. Might throw exception and cause rollback. + Chunk outputs = transform(contribution, inputs); - // Adjust the filter count based on available data - contribution.incrementFilterCount(getFilterCount(inputs,outputs)); + // Adjust the filter count based on available data + contribution.incrementFilterCount(getFilterCount(inputs, outputs)); - // Adjust the outputs if necessary for housekeeping purposes, and then - // write them out... - write(contribution,inputs,getAdjustedOutputs(inputs,outputs)); + // Adjust the outputs if necessary for housekeeping purposes, and then + // write them out... + write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); - } +} ``` > initializeUserData(inputs); @@ -2441,9 +2399,9 @@ public final void process(StepContribution contribution,Chunk inputs)throws E 允许临时状态存储在用户数据字段中, ```java - protected void initializeUserData(Chunk inputs){ - inputs.setUserData(inputs.size()); - } +protected void initializeUserData(Chunk inputs){ + inputs.setUserData(inputs.size()); +} ``` > isComplete(inputs) @@ -2455,37 +2413,34 @@ public final void process(StepContribution contribution,Chunk inputs)throws E 进行转换,如果过滤了任何项目,则在输入迭代器上调用 remove()。可能会抛出异常并导致回滚。 ```java - protected Chunk transform(StepContribution contribution,Chunk inputs)throws Exception{ - Chunk outputs=new Chunk<>(); - for(Chunk.ChunkIterator iterator=inputs.iterator();iterator.hasNext();){ -final I item=iterator.next(); +protected Chunk transform(StepContribution contribution, Chunk inputs) throws Exception { + Chunk outputs = new Chunk<>(); + for (Chunk.ChunkIterator iterator = inputs.iterator(); iterator.hasNext(); ) { + final I item = iterator.next(); O output; - Timer.Sample sample=BatchMetrics.createTimerSample(); - String status=BatchMetrics.STATUS_SUCCESS; - try{ - output=doProcess(item); - } - catch(Exception e){ - /* - * For a simple chunk processor (no fault tolerance) we are done - * here, so prevent any more processing of these inputs. - */ - inputs.clear(); - status=BatchMetrics.STATUS_FAILURE; - throw e; - } - finally{ - stopTimer(sample,contribution.getStepExecution(),"item.process",status,"Item processing"); - } - if(output!=null){ - outputs.add(output); - } - else{ - iterator.remove(); - } - } - return outputs; + Timer.Sample sample = BatchMetrics.createTimerSample(); + String status = BatchMetrics.STATUS_SUCCESS; + try { + output = doProcess(item); + } catch (Exception e) { + /* + * For a simple chunk processor (no fault tolerance) we are done + * here, so prevent any more processing of these inputs. + */ + inputs.clear(); + status = BatchMetrics.STATUS_FAILURE; + throw e; + } finally { + stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing"); + } + if (output != null) { + outputs.add(output); + } else { + iterator.remove(); } + } + return outputs; +} ``` 只看一个方法,output = doProcess(item); @@ -2493,26 +2448,25 @@ final I item=iterator.next(); > doProcess(item) ```java - protected final O doProcess(I item)throws Exception{ +protected final O doProcess(I item) throws Exception { - if(itemProcessor==null){ -@SuppressWarnings("unchecked") - O result=(O)item; - return result; - } + if (itemProcessor == null) { + @SuppressWarnings("unchecked") + O result = (O) item; + return result; + } - try{ - listener.beforeProcess(item); - O result=itemProcessor.process(item); - listener.afterProcess(item,result); - return result; - } - catch(Exception e){ - listener.onProcessError(item,e); - throw e; - } + try { + listener.beforeProcess(item); + O result = itemProcessor.process(item); + listener.afterProcess(item, result); + return result; + } catch (Exception e) { + listener.onProcessError(item, e); + throw e; + } - } +} ``` `itemProcessor`为空直接返回,如果存在。 @@ -2531,37 +2485,35 @@ final I item=iterator.next(); 如果需要的话,调整输出以进行内务处理,然后将它们写出来...... ```java - /** +/** * Simple implementation delegates to the {@link #doWrite(List)} method and * increments the write count in the contribution. Subclasses can handle * more complicated scenarios, e.g.with fault tolerance. If output items are * skipped they should be removed from the inputs as well. * * @param contribution the current step contribution - * @param inputs the inputs that gave rise to the outputs - * @param outputs the outputs to write + * @param inputs the inputs that gave rise to the outputs + * @param outputs the outputs to write * @throws Exception if there is a problem */ -protected void write(StepContribution contribution,Chunk inputs,Chunk outputs)throws Exception{ - Timer.Sample sample=BatchMetrics.createTimerSample(); - String status=BatchMetrics.STATUS_SUCCESS; - try{ +protected void write(StepContribution contribution, Chunk inputs, Chunk outputs) throws Exception { + Timer.Sample sample = BatchMetrics.createTimerSample(); + String status = BatchMetrics.STATUS_SUCCESS; + try { doWrite(outputs.getItems()); - } - catch(Exception e){ + } catch (Exception e) { /* * For a simple chunk processor (no fault tolerance) we are done * here, so prevent any more processing of these inputs. */ inputs.clear(); - status=BatchMetrics.STATUS_FAILURE; + status = BatchMetrics.STATUS_FAILURE; throw e; - } - finally{ - stopTimer(sample,contribution.getStepExecution(),"chunk.write",status,"Chunk writing"); - } - contribution.incrementWriteCount(outputs.size()); - } + } finally { + stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing"); + } + contribution.incrementWriteCount(outputs.size()); +} ``` 关键俩句,`doWrite(outputs.getItems());`和 `contribution.incrementWriteCount(outputs.size());`。后面就是添加 `WriteCount`。 @@ -2569,23 +2521,22 @@ protected void write(StepContribution contribution,Chunk inputs,Chunk outp > doWrite(outputs.getItems()) ```java - protected final void doWrite(List items)throws Exception{ +protected final void doWrite(List items) throws Exception { - if(itemWriter==null){ + if (itemWriter == null) { return; - } + } - try{ + try { listener.beforeWrite(items); writeItems(items); doAfterWrite(items); - } - catch(Exception e){ - doOnWriteError(e,items); + } catch (Exception e) { + doOnWriteError(e, items); throw e; - } + } - } +} ``` `listener.beforeWrite(items)`:首先`ItemWriteListener#beforeWrite`方法。 @@ -2624,15 +2575,15 @@ locked = true; 锁=true。 继续执行`stepExecution.apply(contribution);` ```java - public synchronized void apply(StepContribution contribution){ - readSkipCount+=contribution.getReadSkipCount(); - writeSkipCount+=contribution.getWriteSkipCount(); - processSkipCount+=contribution.getProcessSkipCount(); - filterCount+=contribution.getFilterCount(); - readCount+=contribution.getReadCount(); - writeCount+=contribution.getWriteCount(); - exitStatus=exitStatus.and(contribution.getExitStatus()); - } +public synchronized void apply(StepContribution contribution) { + readSkipCount += contribution.getReadSkipCount(); + writeSkipCount += contribution.getWriteSkipCount(); + processSkipCount += contribution.getProcessSkipCount(); + filterCount += contribution.getFilterCount(); + readCount += contribution.getReadCount(); + writeCount += contribution.getWriteCount(); + exitStatus = exitStatus.and(contribution.getExitStatus()); +} ``` 相加。 @@ -2673,29 +2624,29 @@ locked = true; 锁=true。 顺序在这里可能很重要。 ```java - protected boolean isComplete(RepeatContext context,RepeatStatus result){ - boolean complete=completionPolicy.isComplete(context,result); - if(complete){ +protected boolean isComplete(RepeatContext context, RepeatStatus result) { + boolean complete = completionPolicy.isComplete(context, result); + if (complete) { logger.debug("Repeat is complete according to policy and result value."); - } - return complete; - } + } + return complete; +} ``` 状态是否为`RepeatStatus#CONTINUABLE`。 ```java - private boolean isMarkedComplete(RepeatContext context){ - boolean complete=context.isCompleteOnly(); - if(context.getParent()!=null){ - complete=complete||isMarkedComplete(context.getParent()); - } - if(complete){ +private boolean isMarkedComplete(RepeatContext context) { + boolean complete = context.isCompleteOnly(); + if (context.getParent() != null) { + complete = complete || isMarkedComplete(context.getParent()); + } + if (complete) { logger.debug("Repeat is complete according to context alone."); - } - return complete; + } + return complete; - } +} ``` 检查上一层。 @@ -2750,31 +2701,31 @@ execution.upgradeStatus(stepExecution.getStatus()); #### DefaultBatchConfigurer#initialize() ```java - try{ - if(dataSource==null){ +try { + if (dataSource == null) { logger.warn("No datasource was provided...using a Map based JobRepository"); - if(getTransactionManager()==null){ - logger.warn("No transaction manager was provided, using a ResourcelessTransactionManager"); - this.transactionManager=new ResourcelessTransactionManager(); + if (getTransactionManager() == null) { + logger.warn("No transaction manager was provided, using a ResourcelessTransactionManager"); + this.transactionManager = new ResourcelessTransactionManager(); } - MapJobRepositoryFactoryBean jobRepositoryFactory=new MapJobRepositoryFactoryBean(getTransactionManager()); + MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean(getTransactionManager()); jobRepositoryFactory.afterPropertiesSet(); - this.jobRepository=jobRepositoryFactory.getObject(); + this.jobRepository = jobRepositoryFactory.getObject(); - MapJobExplorerFactoryBean jobExplorerFactory=new MapJobExplorerFactoryBean(jobRepositoryFactory); + MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory); jobExplorerFactory.afterPropertiesSet(); - this.jobExplorer=jobExplorerFactory.getObject(); - }else{ - this.jobRepository=createJobRepository(); - this.jobExplorer=createJobExplorer(); - } + this.jobExplorer = jobExplorerFactory.getObject(); + } else { + this.jobRepository = createJobRepository(); + this.jobExplorer = createJobExplorer(); + } - this.jobLauncher=createJobLauncher(); - }catch(Exception e){ - throw new BatchConfigurationException(e); - } + this.jobLauncher = createJobLauncher(); +} catch (Exception e) { + throw new BatchConfigurationException(e); +} ``` 很简单的代码,`dataSource`不存在,`jobExplorer`和`jobRepository`就使用`MapJobRepositoryFactoryBean` @@ -2786,6 +2737,4 @@ execution.upgradeStatus(stepExecution.getStatus()); 里面涉及到一大堆的监听器,处理器。每个类基本都有讲到。 类上的注释非常重要~! -首先校验,然后使用`stepExecution`保存数据。ecDao 保存 ExecutionContext - -## 完整代码示例在[github](https://github.com/poo0054/spring-boot-study/tree/master/spring-batch) +首先校验,然后使用`stepExecution`保存数据。ecDao 保存 ExecutionContext。