Spring Batch - 21. TaskletStep

출처

목차

TaskletStep

TaskletStep 은 Step 구현체 중 하나로 Tasklet 을 실행한다.

TaskletStep 은 Step 인터페이스를 구현하며, Tasklet 을 실행하기 위한 Step 입니다. Tasklet 은 Step 의 실행 시점에 호출되는 단일한 실행 단위입니다.

TaskletStep 을 사용하는 경우, Tasklet 인터페이스를 구현한 클래스를 작성하고, 해당 클래스에서 수행할 로직을 구현합니다. 이후 TaskletStep 을 정의할 때, 이 Tasklet 클래스를 참조하여 Step을 구성합니다.

TaskletStep 을 사용하는 가장 일반적인 예는 파일을 읽어들이고, 처리한 후 결과를 파일로 출력하는 작업입니다. 이 경우, Tasklet 클래스에서 파일을 읽어들이고, 처리한 후 결과를 파일로 출력하는 로직을 구현하면 됩니다. 또한, TaskletStep 은 다양한 환경에서 사용될 수 있으며, Spring Batch의 다른 Step과 조합하여 다양한 처리 작업을 수행할 수 있습니다.

TaskletStep 아키텍처

Step 은 RepeatTemplate 를 사용해서 Tasklet 구문을 트랜잭션 경계 내에서 반복해서 실행한다.
Task 기반과 Chunk 기반으로 나누어서 Tasklet 을 실행한다.

TaskletStep 과 메타데이터

Job 이 TaskletStep 을 실행하게 되면 StepExecution 객체가 생성됩니다. StepExecution 객체는 내부적으로 ExecutionContext 를 생성해 TaskletStep 에 전달하게 됩니다.

TaskletStep 이 Step 을 실행하기 전에 StepListener 객체를 호출해 Step 실행전에 실행해야할 로직을 수행합니다.

RepeatTemplate 객체가 Tasklet 객체들을 반복적으로 수행합니다. 기본값으로 RepeatStatus.CONTINUABLE 상태로 반복해서 Tasklet 을 수행하고 Tasklet 이 정상적으로 종료된 경우에는 RepeatStatus.FINISHED 상태로 반환합니다.

Step 을 실행후에는 다시 StepListener 를 호출해 Step 실행후 실행해야할 로직을 수행합니다.

Step 이 완료후 StepExecution 객체에 Step 완료 상태를 업데이트 해줍니다. ExitStatus 와 BatchStatus 두 가지 상태를 업데이트 해줍니다.

SimpleStepHandler

SimpleJob 이 Step 을 실행시키게 되면 SimpleStepHandler 에 의해 Step 이 실행됩니다.

@Override
public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
JobRestartException, StartLimitExceededException {
if (execution.isStopping()) {
throw new JobInterruptedException("JobExecution interrupted.");
}

JobInstance jobInstance = execution.getJobInstance();

// StepExecution 가져온다.
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
// If the last execution of this step was in the same job, it's
// probably intentional so we want to run it again...
if (logger.isInfoEnabled()) {
logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. "
+ "If either step fails, both will be executed again on restart.", step.getName(), jobInstance
.getJobName()));
}
lastStepExecution = null;
}

StepExecution currentStepExecution = lastStepExecution;

if (shouldStart(lastStepExecution, execution, step)) {

// StepExecution 객체를 생성한다.
currentStepExecution = execution.createStepExecution(step.getName());

boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
BatchStatus.COMPLETED));

if (isRestart) {
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());

if(lastStepExecution.getExecutionContext().containsKey("batch.executed")) {
currentStepExecution.getExecutionContext().remove("batch.executed");
}
}
else {
// 현재 StepExecution 객체안에 ExecutionContext 객체를 생성해 저장한다.
currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
}

// JobRepository 객체를 통해 현재 StepExecution 객체를 저장한다.
jobRepository.add(currentStepExecution);

if (logger.isInfoEnabled()) {
logger.info("Executing step: [" + step.getName() + "]");
}

try {
// Step 을 실행한다.
step.execute(currentStepExecution);
currentStepExecution.getExecutionContext().put("batch.executed", true);
}
catch (JobInterruptedException e) {
// Ensure that the job gets the message that it is stopping
// and can pass it on to other steps that are executing
// concurrently.
execution.setStatus(BatchStatus.STOPPING);
throw e;
}

jobRepository.updateExecutionContext(execution);

if (currentStepExecution.getStatus() == BatchStatus.STOPPING
|| currentStepExecution.getStatus() == BatchStatus.STOPPED) {
// Ensure that the job gets the message that it is stopping
execution.setStatus(BatchStatus.STOPPING);
throw new JobInterruptedException("Job interrupted by step execution");
}

}

return currentStepExecution;
}

Task 기반 Chunk 기반 비교

스프링 배치에서는 Step 의 실행단위는 크게 2가지로 나누어진다. 하나는 Task 기반이고 다른 하나는 Chunk 기반입니다.

Task 기반 Step 은 단일 작업 기반으로 처리하는 것이 효율적인 경우에 사용합니다. 주로 Tasklet 구현체를 만들어서 사용하며 대량 처리를 하는 경우 Chunk 기반에 비해 더 복잡한 구현이 필요하다.

Chunk 기반 Step 은 하나의 큰 덩어리를 N 개씩 나눠서 실행한다는 의미로 대량 처리를 하는 경우 더 효율적으로 처리할 수 있습니다. ItemReader, ItemProcessor, ItemWriter 를 사용하며 Chunk 기반 전용 Tasklet 인 ChunkOrientedTasklet 구현체가 제공된다.

AbstractTaskletStepBuilder

public TaskletStep build() {

registerStepListenerAsChunkListener();

TaskletStep step = new TaskletStep(getName());

super.enhance(step);

step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0]));

// Tasklet 실행에 필요한 Transaction 설정
if (transactionAttribute != null) {
step.setTransactionAttribute(transactionAttribute);
}

// 반복 실행을 위한 RepeatTemplate 설정
if (stepOperations == null) {

stepOperations = new RepeatTemplate();

if (taskExecutor != null) {
TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
repeatTemplate.setTaskExecutor(taskExecutor);
repeatTemplate.setThrottleLimit(throttleLimit);
stepOperations = repeatTemplate;
}

((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);

}
// Step 에 RepeatTemplate 설정
step.setStepOperations(stepOperations);
// Tasklet 설정
step.setTasklet(createTasklet());

step.setStreams(streams.toArray(new ItemStream[0]));

try {
step.afterPropertiesSet();
}
catch (Exception e) {
throw new StepBuilderException(e);
}

return step;

}

AbstractStep

@Override
public final void execute(StepExecution stepExecution) throws JobInterruptedException,
UnexpectedJobExecutionException {

Assert.notNull(stepExecution, "stepExecution must not be null");

if (logger.isDebugEnabled()) {
logger.debug("Executing: id=" + stepExecution.getId());
}

// StepExecution 관련 속성 저장
stepExecution.setStartTime(new Date());
stepExecution.setStatus(BatchStatus.STARTED);
Timer.Sample sample = BatchMetrics.createTimerSample();
getJobRepository().update(stepExecution);

// Start with a default value that will be trumped by anything
ExitStatus exitStatus = ExitStatus.EXECUTING;

doExecutionRegistration(stepExecution);

try {
getCompositeListener().beforeStep(stepExecution);
open(stepExecution.getExecutionContext());

try {
// Step 실행
doExecute(stepExecution);
}
catch (RepeatException e) {
throw e.getCause();
}
exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());

// Check if someone is trying to stop us
if (stepExecution.isTerminateOnly()) {
throw new JobInterruptedException("JobExecution interrupted.");
}

// Need to upgrade here not set, in case the execution was stopped
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
if (logger.isDebugEnabled()) {
logger.debug("Step execution success: id=" + stepExecution.getId());
}
}
catch (Throwable e) {
stepExecution.upgradeStatus(determineBatchStatus(e));
exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
stepExecution.addFailureException(e);
if (stepExecution.getStatus() == BatchStatus.STOPPED) {
logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage()));
if (logger.isDebugEnabled()) {
logger.debug("Full exception", e);
}
}
else {
logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
}
finally {

try {
// Update the step execution to the latest known value so the
// listeners can act on it
exitStatus = exitStatus.and(stepExecution.getExitStatus());
stepExecution.setExitStatus(exitStatus);
exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
}
catch (Exception e) {
logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}

try {
getJobRepository().updateExecutionContext(stepExecution);
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
stepExecution.addFailureException(e);
logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
+ "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}

sample.stop(BatchMetrics.createTimer("step", "Step duration",
Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()),
Tag.of("name", stepExecution.getStepName()),
Tag.of("status", stepExecution.getExitStatus().getExitCode())
));
stepExecution.setEndTime(new Date());
stepExecution.setExitStatus(exitStatus);
Duration stepExecutionDuration = BatchMetrics.calculateDuration(stepExecution.getStartTime(), stepExecution.getEndTime());
if (logger.isInfoEnabled()) {
logger.info("Step: [" + stepExecution.getStepName() + "] executed in " + BatchMetrics.formatDuration(stepExecutionDuration));
}
try {
getJobRepository().update(stepExecution);
}
catch (Exception e) {
stepExecution.setStatus(BatchStatus.UNKNOWN);
stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
stepExecution.addFailureException(e);
logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
+ "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}

try {
close(stepExecution.getExecutionContext());
}
catch (Exception e) {
logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
stepExecution.addFailureException(e);
}

doExecutionRelease();

if (logger.isDebugEnabled()) {
logger.debug("Step execution complete: " + stepExecution.getSummary());
}
}
}

TaskletStep

@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

stream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);

// Shared semaphore per step execution, so other step executions can run
// in parallel without needing the lock
final Semaphore semaphore = createSemaphore();

stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
throws Exception {

StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

// Before starting a new transaction, check for
// interruption.
interruptionPolicy.checkInterrupted(stepExecution);

RepeatStatus result;
try {
// TransactionTemplate 안에서 Transaction 처리
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
catch (UncheckedTransactionException e) {
// Allow checked exceptions to be thrown inside callback
throw (Exception) e.getCause();
}

chunkListener.afterChunk(chunkContext);

// Check for interruption after transaction as well, so that
// the interrupted exception is correctly propagated up to
// caller
interruptionPolicy.checkInterrupted(stepExecution);

return result == null ? RepeatStatus.FINISHED : result;
}

});

}

startLimit

Step 마다 설정할 수 있으며 Step 의 실행 횟수를 조정합니다. 설정된 실행 횟수를 초과해서 Step 이 실행될 경우 StartLimitExceededException 예외가 발생합니다.

@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("stepContribution = " + stepContribution + ", chunkContext = " + chunkContext);
throw new RuntimeException("");
// return RepeatStatus.FINISHED;
}
})
.startLimit(3)
.build();
}

allowStartIfComplete

Step 도 Job 과 마찬가지로 실행이 성공할 경우 재시작하지 않고 실패할 경우에는 재시작합니다.

BatchStatus.COMPLETED 상태를 가진 Step 은 Job 이 재시작할때 실행하지 않고 스킵합니다. 재시작이 가능한 Job 에서 Step 의 이전 성공 여부와 상관 없이 항상 Step 을 실행하기 위해서는 allowStartIfComplete 메소드를 통해 해당 Step 이 재시작이 가능하도록 설정해줘야 합니다.

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("stepContribution = " + stepContribution + ", chunkContext = " + chunkContext);
return RepeatStatus.FINISHED;
}
})
.allowStartIfComplete(true)
.build();
}

TaskletStep 상속

Share