@ -5,7 +5,6 @@
版本使用 2.7.13
```xml
< dependency >
< groupId > org.springframework.boot< / groupId >
< artifactId > spring-boot-dependencies< / artifactId >
@ -237,7 +236,7 @@ public ExitStatus afterStep(StepExecution stepExecution){
中有个`protected final CommonStepProperties properties;`属性。
对`CommonStepProperties`中`name`, `JobRepository`和`PlatformTransactionManager`进行赋值。
#### . < Integer , Integer > chunk(2)
#### < Integer , Integer > chunk(2)
```java
/**
@ -306,7 +305,6 @@ public void write(List<?extends Integer> items){
System.out.println("一次读取:" + Arrays.toString(items.toArray()));
}
};
}
```
@ -393,8 +391,7 @@ public TaskletStep build(){
try {
step.afterPropertiesSet();
}
catch(Exception e){
} catch (Exception e) {
throw new StepBuilderException(e);
}
@ -563,8 +560,7 @@ public SimpleJobBuilder start(Step step){
job.setSteps(steps);
try {
job.afterPropertiesSet();
}
catch(Exception e){
} catch (Exception e) {
throw new JobBuilderException(e);
}
return job;
@ -628,7 +624,6 @@ public SimpleJobBuilder start(Step step){
也是使用这个`JobLauncherApplicationRunner#executeRegisteredJobs(JobParameters jobParameters)`。
```java
@RestController
public class BatchController {
@ -721,8 +716,7 @@ public JobExecution createJobExecution(String jobName,JobParameters jobParameter
}
}
executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
}
else{
} else {
// no job found, create one
jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
executionContext = new ExecutionContext();
@ -818,23 +812,21 @@ public void updateExecutionContext(JobExecution jobExecution){
public void run() {
try {
job.execute(jobExecution);
}
catch(Throwable t){
} catch (Throwable t) {
rethrow(t);
}
}
private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if(t instanceof Error){
} else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}
catch(TaskRejectedException e){
} catch (TaskRejectedException e) {
jobExecution.upgradeStatus(BatchStatus.FAILED);
if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
@ -1243,16 +1235,14 @@ public StepExecution handleStep(Step step,JobExecution execution)throws JobInter
if (lastStepExecution.getExecutionContext().containsKey("batch.executed")) {
currentStepExecution.getExecutionContext().remove("batch.executed");
}
}
else{
} else {
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
}
jobRepository.add(currentStepExecution);
try {
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);
}
catch(JobInterruptedException e){
} catch (JobInterruptedException e) {
execution.setStatus(BatchStatus.STOPPING);
throw e;
}
@ -1334,8 +1324,7 @@ protected boolean shouldStart(StepExecution lastStepExecution,JobExecution jobEx
BatchStatus stepStatus;
if (lastStepExecution == null) {
stepStatus = BatchStatus.STARTING;
}
else{
} else {
stepStatus = lastStepExecution.getStatus();
}
@ -1358,8 +1347,7 @@ protected boolean shouldStart(StepExecution lastStepExecution,JobExecution jobEx
if (jobRepository.getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit ( ) ) {
// step start count is less than start max, return true
return true;
}
else{
} else {
// start max has been exceeded, throw an exception.
throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName()
+ "StartMax: " + step.getStartLimit());
@ -1463,6 +1451,7 @@ public void saveStepExecution(StepExecution stepExecution){
public void saveExecutionContext(StepExecution stepExecution) {
updateExecutionContext(stepExecution);
}
@Override
public void updateExecutionContext(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution.getExecutionContext();
@ -1510,8 +1499,7 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep
try {
doExecute(stepExecution);
}
catch(RepeatException e){
} catch (RepeatException e) {
throw e.getCause();
}
exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
@ -1526,8 +1514,7 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep
if (logger.isDebugEnabled()) {
logger.debug("Step execution success: id=" + stepExecution.getId());
}
}
catch(Throwable e){
} catch (Throwable e) {
stepExecution.upgradeStatus(determineBatchStatus(e));
exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
stepExecution.addFailureException(e);
@ -1536,12 +1523,10 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep
if (logger.isDebugEnabled()) {
logger.debug("Full exception", e);
}
}
else{
} else {
logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
}
finally{
} finally {
try {
// Update the step execution to the latest known value so the
@ -1549,15 +1534,13 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep
exitStatus = exitStatus.and(stepExecution.getExitStatus());
stepExecution.setExitStatus(exitStatus);
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
}
catch(Exception e){
} catch (Exception e) {
logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
try {
getJobRepository().updateExecutionContext(stepExecution);
}
catch(Exception e){
} catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
stepExecution.addFailureException(e);
@ -1578,8 +1561,7 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep
}
try {
getJobRepository().update(stepExecution);
}
catch(Exception e){
} catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
stepExecution.addFailureException(e);
@ -1589,8 +1571,7 @@ public final void execute(StepExecution stepExecution)throws JobInterruptedExcep
try {
close(stepExecution.getExecutionContext());
}
catch(Exception e){
} catch (Exception e) {
logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
stepExecution.addFailureException(e);
}
@ -1708,8 +1689,7 @@ public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext ch
try {
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
catch(UncheckedTransactionException e){
} catch (UncheckedTransactionException e) {
throw (Exception) e.getCause();
}
chunkListener.afterChunk(chunkContext);
@ -1755,8 +1735,7 @@ public RepeatStatus iterate(RepeatCallback callback){
// This works with an asynchronous TaskExecutor: the
// interceptors have to wait for the child processes.
result = executeInternal(callback);
}
finally{
} finally {
RepeatSynchronizationManager.clear();
if (outer != null) {
RepeatSynchronizationManager.register(outer);
@ -1820,8 +1799,7 @@ public RepeatStatus iterate(RepeatCallback callback){
result = getNextResult(context, callback, state);
executeAfterInterceptors(context, result);
}
catch(Throwable throwable){
} catch (Throwable throwable) {
doHandle(throwable, context, deferred);
}
@ -1845,8 +1823,7 @@ public RepeatStatus iterate(RepeatCallback callback){
* No need for explicit catch here - if the business processing threw an
* exception it was already handled by the helper methods. An exception
* here is necessarily fatal.
*/
finally{
*/ finally {
try {
if (!deferred.isEmpty()) {
Throwable throwable = deferred.iterator().next();
@ -1856,15 +1833,13 @@ public RepeatStatus iterate(RepeatCallback callback){
}
rethrow(throwable);
}
}
finally{
} finally {
try {
for (int i = listeners.length; i-- > 0; ) {
RepeatListener interceptor = listeners[i];
interceptor.close(context);
}
}
finally{
} finally {
context.close();
}
}
@ -1881,7 +1856,6 @@ public RepeatStatus iterate(RepeatCallback callback){
*
* @return a {@link RepeatContext} object that can be used by the implementation to store
* internal state for a batch step.
*
* @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
*/
protected RepeatContext start() {
@ -1991,8 +1965,7 @@ public RepeatStatus doInIteration(RepeatContext context)throws Exception{
logger.debug("Chunk execution starting: queue size=" + attributeQueue.size());
}
return doInChunkContext(context, chunkContext);
}
finally{
} finally {
// Still some stuff to do with the data in this chunk,
// pass it back.
if (!chunkContext.isComplete()) {
@ -2026,8 +1999,7 @@ public RepeatStatus doInChunkContext(RepeatContext repeatContext,ChunkContext ch
try {
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
catch(UncheckedTransactionException e){
} catch (UncheckedTransactionException e) {
// Allow checked exceptions to be thrown inside callback
throw (Exception) e.getCause();
}
@ -2074,19 +2046,16 @@ public<T> T execute(TransactionCallback<T> action)throws TransactionException{
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else{
} else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status);
}
catch(RuntimeException|Error ex){
} catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch(Throwable ex){
} catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
@ -2125,15 +2094,13 @@ public RepeatStatus doInTransaction(TransactionStatus status){
if (result == null) {
result = RepeatStatus.FINISHED;
}
}
catch(Exception e){
} catch (Exception e) {
if (transactionAttribute.rollbackOn(e)) {
chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
throw e;
}
}
}
finally{
} finally {
// If the step operations are asynchronous then we need
// to synchronize changes to the step execution (at a
@ -2142,8 +2109,7 @@ public RepeatStatus doInTransaction(TransactionStatus status){
try {
semaphore.acquire();
locked = true;
}
catch(InterruptedException e){
} catch (InterruptedException e) {
logger.error("Thread interrupted while locking for repository update");
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setTerminateOnly();
@ -2172,30 +2138,26 @@ public RepeatStatus doInTransaction(TransactionStatus status){
logger.debug("Saving step execution before commit: " + stepExecution);
}
getJobRepository().update(stepExecution);
}
catch(Exception e){
} catch (Exception e) {
// If we get to here there was a problem saving the step
// execution and we have to fail.
String msg = "JobRepository failure forcing rollback";
logger.error(msg, e);
throw new FatalStepExecutionException(msg, e);
}
}
catch(Error e){
} catch (Error e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
}
catch(RuntimeException e){
} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
}
catch(Exception e){
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
}
@ -2278,14 +2240,12 @@ public RepeatStatus doInIteration(final RepeatContext context)throws Exception{
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs);
}
catch(SkipOverflowException e){
} catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally{
} finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
@ -2329,14 +2289,12 @@ public RepeatStatus doInIteration(final RepeatContext context)throws Exception{
String status = BatchMetrics.STATUS_SUCCESS;
try {
item = read(contribution, inputs);
}
catch(SkipOverflowException e){
} catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally{
} finally {
stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
@ -2368,6 +2326,7 @@ public RepeatStatus doInIteration(final RepeatContext context)throws Exception{
```java
/**
* Surrounds the read call with listener callbacks.
*
* @return the item or {@code null} if the data source is exhausted
* @throws Exception is thrown if error occurs during read.
*/
@ -2380,8 +2339,7 @@ protected final I doRead()throws Exception{
listener.afterRead(item);
}
return item;
}
catch(Exception e){
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage() + " : " + e.getClass().getName());
}
@ -2464,8 +2422,7 @@ final I item=iterator.next();
String status = BatchMetrics.STATUS_SUCCESS;
try {
output = doProcess(item);
}
catch(Exception e){
} catch (Exception e) {
/*
* For a simple chunk processor (no fault tolerance) we are done
* here, so prevent any more processing of these inputs.
@ -2473,14 +2430,12 @@ final I item=iterator.next();
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
finally{
} finally {
stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
}
if (output != null) {
outputs.add(output);
}
else{
} else {
iterator.remove();
}
}
@ -2506,8 +2461,7 @@ final I item=iterator.next();
O result = itemProcessor.process(item);
listener.afterProcess(item, result);
return result;
}
catch(Exception e){
} catch (Exception e) {
listener.onProcessError(item, e);
throw e;
}
@ -2547,8 +2501,7 @@ protected void write(StepContribution contribution,Chunk<I> inputs,Chunk<O> outp
String status = BatchMetrics.STATUS_SUCCESS;
try {
doWrite(outputs.getItems());
}
catch(Exception e){
} catch (Exception e) {
/*
* For a simple chunk processor (no fault tolerance) we are done
* here, so prevent any more processing of these inputs.
@ -2556,8 +2509,7 @@ protected void write(StepContribution contribution,Chunk<I> inputs,Chunk<O> outp
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
finally{
} finally {
stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
}
contribution.incrementWriteCount(outputs.size());
@ -2579,8 +2531,7 @@ protected void write(StepContribution contribution,Chunk<I> inputs,Chunk<O> outp
listener.beforeWrite(items);
writeItems(items);
doAfterWrite(items);
}
catch(Exception e){
} catch (Exception e) {
doOnWriteError(e, items);
throw e;
}
@ -2786,6 +2737,4 @@ execution.upgradeStatus(stepExecution.getStatus());
里面涉及到一大堆的监听器,处理器。每个类基本都有讲到。 类上的注释非常重要~!
首先校验,然后使用`stepExecution`保存数据。ecDao 保存 ExecutionContext
## 完整代码示例在[github](https://github.com/poo0054/spring-boot-study/tree/master/spring-batch)
首先校验,然后使用`stepExecution`保存数据。ecDao 保存 ExecutionContext。