TaskletStep 은 Step 인터페이스를 구현하며, Tasklet 을 실행하기 위한 Step 입니다. Tasklet 은 Step 의 실행 시점에 호출되는 단일한 실행 단위입니다.
TaskletStep 을 사용하는 경우, Tasklet 인터페이스를 구현한 클래스를 작성하고, 해당 클래스에서 수행할 로직을 구현합니다. 이후 TaskletStep 을 정의할 때, 이 Tasklet 클래스를 참조하여 Step을 구성합니다.
TaskletStep 을 사용하는 가장 일반적인 예는 파일을 읽어들이고, 처리한 후 결과를 파일로 출력하는 작업입니다. 이 경우, Tasklet 클래스에서 파일을 읽어들이고, 처리한 후 결과를 파일로 출력하는 로직을 구현하면 됩니다. 또한, TaskletStep 은 다양한 환경에서 사용될 수 있으며, Spring Batch의 다른 Step과 조합하여 다양한 처리 작업을 수행할 수 있습니다.
TaskletStep 아키텍처
Step 은 RepeatTemplate 를 사용해서 Tasklet 구문을 트랜잭션 경계 내에서 반복해서 실행한다. Task 기반과 Chunk 기반으로 나누어서 Tasklet 을 실행한다.
TaskletStep 과 메타데이터
Job 이 TaskletStep 을 실행하게 되면 StepExecution 객체가 생성됩니다. StepExecution 객체는 내부적으로 ExecutionContext 를 생성해 TaskletStep 에 전달하게 됩니다.
TaskletStep 이 Step 을 실행하기 전에 StepListener 객체를 호출해 Step 실행전에 실행해야할 로직을 수행합니다.
RepeatTemplate 객체가 Tasklet 객체들을 반복적으로 수행합니다. 기본값으로 RepeatStatus.CONTINUABLE 상태로 반복해서 Tasklet 을 수행하고 Tasklet 이 정상적으로 종료된 경우에는 RepeatStatus.FINISHED 상태로 반환합니다.
Step 을 실행후에는 다시 StepListener 를 호출해 Step 실행후 실행해야할 로직을 수행합니다.
Step 이 완료후 StepExecution 객체에 Step 완료 상태를 업데이트 해줍니다. ExitStatus 와 BatchStatus 두 가지 상태를 업데이트 해줍니다.
SimpleJob 이 Step 을 실행시키게 되면 SimpleStepHandler 에 의해 Step 이 실행됩니다.
@Override public StepExecution handleStep(Step step, JobExecution execution)throws JobInterruptedException, JobRestartException, StartLimitExceededException { if (execution.isStopping()) { thrownewJobInterruptedException("JobExecution interrupted."); }
// StepExecution 가져온다. StepExecutionlastStepExecution= jobRepository.getLastStepExecution(jobInstance, step.getName()); if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) { // If the last execution of this step was in the same job, it's // probably intentional so we want to run it again... if (logger.isInfoEnabled()) { logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. " + "If either step fails, both will be executed again on restart.", step.getName(), jobInstance .getJobName())); } lastStepExecution = null; }
try { // Step 을 실행한다. step.execute(currentStepExecution); currentStepExecution.getExecutionContext().put("batch.executed", true); } catch (JobInterruptedException e) { // Ensure that the job gets the message that it is stopping // and can pass it on to other steps that are executing // concurrently. execution.setStatus(BatchStatus.STOPPING); throw e; }
if (currentStepExecution.getStatus() == BatchStatus.STOPPING || currentStepExecution.getStatus() == BatchStatus.STOPPED) { // Ensure that the job gets the message that it is stopping execution.setStatus(BatchStatus.STOPPING); thrownewJobInterruptedException("Job interrupted by step execution"); }
return currentStepExecution; }
Task 기반 Chunk 기반 비교
스프링 배치에서는 Step 의 실행단위는 크게 2가지로 나누어진다. 하나는 Task 기반이고 다른 하나는 Chunk 기반입니다.
Task 기반 Step 은 단일 작업 기반으로 처리하는 것이 효율적인 경우에 사용합니다. 주로 Tasklet 구현체를 만들어서 사용하며 대량 처리를 하는 경우 Chunk 기반에 비해 더 복잡한 구현이 필요하다.
Chunk 기반 Step 은 하나의 큰 덩어리를 N 개씩 나눠서 실행한다는 의미로 대량 처리를 하는 경우 더 효율적으로 처리할 수 있습니다. ItemReader, ItemProcessor, ItemWriter 를 사용하며 Chunk 기반 전용 Tasklet 인 ChunkOrientedTasklet 구현체가 제공된다.
Assert.notNull(stepExecution, "stepExecution must not be null");
if (logger.isDebugEnabled()) { logger.debug("Executing: id=" + stepExecution.getId()); }
// StepExecution 관련 속성 저장 stepExecution.setStartTime(newDate()); stepExecution.setStatus(BatchStatus.STARTED); Timer.Samplesample= BatchMetrics.createTimerSample(); getJobRepository().update(stepExecution);
// Start with a default value that will be trumped by anything ExitStatusexitStatus= ExitStatus.EXECUTING;
// Check if someone is trying to stop us if (stepExecution.isTerminateOnly()) { thrownewJobInterruptedException("JobExecution interrupted."); }
// Need to upgrade here not set, in case the execution was stopped stepExecution.upgradeStatus(BatchStatus.COMPLETED); if (logger.isDebugEnabled()) { logger.debug("Step execution success: id=" + stepExecution.getId()); } } catch (Throwable e) { stepExecution.upgradeStatus(determineBatchStatus(e)); exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); stepExecution.addFailureException(e); if (stepExecution.getStatus() == BatchStatus.STOPPED) { logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage())); if (logger.isDebugEnabled()) { logger.debug("Full exception", e); } } else { logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); } } finally {
try { // Update the step execution to the latest known value so the // listeners can act on it exitStatus = exitStatus.and(stepExecution.getExitStatus()); stepExecution.setExitStatus(exitStatus); exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution)); } catch (Exception e) { logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); }
try { getJobRepository().updateExecutionContext(stepExecution); } catch (Exception e) { stepExecution.setStatus(BatchStatus.UNKNOWN); exitStatus = exitStatus.and(ExitStatus.UNKNOWN); stepExecution.addFailureException(e); logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); }
sample.stop(BatchMetrics.createTimer("step", "Step duration", Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), Tag.of("name", stepExecution.getStepName()), Tag.of("status", stepExecution.getExitStatus().getExitCode()) )); stepExecution.setEndTime(newDate()); stepExecution.setExitStatus(exitStatus); DurationstepExecutionDuration= BatchMetrics.calculateDuration(stepExecution.getStartTime(), stepExecution.getEndTime()); if (logger.isInfoEnabled()) { logger.info("Step: [" + stepExecution.getStepName() + "] executed in " + BatchMetrics.formatDuration(stepExecutionDuration)); } try { getJobRepository().update(stepExecution); } catch (Exception e) { stepExecution.setStatus(BatchStatus.UNKNOWN); stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN)); stepExecution.addFailureException(e); logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. " + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); }
try { close(stepExecution.getExecutionContext()); } catch (Exception e) { logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e); stepExecution.addFailureException(e); }
// Shared semaphore per step execution, so other step executions can run // in parallel without needing the lock finalSemaphoresemaphore= createSemaphore();
// Before starting a new transaction, check for // interruption. interruptionPolicy.checkInterrupted(stepExecution);
RepeatStatus result; try { // TransactionTemplate 안에서 Transaction 처리 result = newTransactionTemplate(transactionManager, transactionAttribute) .execute(newChunkTransactionCallback(chunkContext, semaphore)); } catch (UncheckedTransactionException e) { // Allow checked exceptions to be thrown inside callback throw (Exception) e.getCause(); }
// Check for interruption after transaction as well, so that // the interrupted exception is correctly propagated up to // caller interruptionPolicy.checkInterrupted(stepExecution);
return result == null ? RepeatStatus.FINISHED : result; }
Step 마다 설정할 수 있으며 Step 의 실행 횟수를 조정합니다. 설정된 실행 횟수를 초과해서 Step 이 실행될 경우 StartLimitExceededException 예외가 발생합니다.
Step 도 Job 과 마찬가지로 실행이 성공할 경우 재시작하지 않고 실패할 경우에는 재시작합니다.
BatchStatus.COMPLETED 상태를 가진 Step 은 Job 이 재시작할때 실행하지 않고 스킵합니다. 재시작이 가능한 Job 에서 Step 의 이전 성공 여부와 상관 없이 항상 Step 을 실행하기 위해서는 allowStartIfComplete 메소드를 통해 해당 Step 이 재시작이 가능하도록 설정해줘야 합니다.