Spring Batch - ChunkOrientedTasklet

출처

목차

ChunkOrientedTasklet

ChunkOrientedTasklet은 Tasklet 인터페이스를 구현체로 Chunk 지향 처리를 담당하는 도메인 객체

ChunkOrientedTasklet은 이 execute() 메서드 내에서 ItemReader, ItemProcessor 및 ItemWriter를 사용하여 Chunk 지향 처리를 수행합니다.

TaskletStep 에 의해 반복적으로 실행되며 ChunkOrientedTasklet 이 실행될 때마다 매번 새로운 Transaction 이 생성되어 처리가 이뤄집니다. 예외가 발생할 경우 해당 Chunk 는 Rollback 되며 이전에 Commit 한 Chunk 는 완료된 상태가 유지됩니다.

내부적으로 ItemReader 를 핸들링하는 ChunkProvider 와 ItemProcessor, ItemWriter 를 핸들링하는 ChunkProcessor 타입의 구현체를 갖습니다.

ChunkOrientedTasklet 흐름

TaskletStep 이 ChunkOrientedTasklet 객체의 execute 메소드를 호출하면 ChunkProvider 객체를 호출하게 됩니다. ChunkProvider 는 ItemReader 객체를 이용해 Item 들을 설정된 Chunk 사이즈만큼 반복해서 읽어온 후 ChunkOrientedTasklet 객체로 반환합니다.

ChunkOrientedTasklet 객체는 ChunkProcessor 객체를 호출해 읽어온 Item 들을 처리합니다. ChunkProcessor 객체는 ItemProcessor 객체를 이용해 전달받은 Item 객체에 데이터 처리를 진행하고 ItemWriter 에 처리 결과를 전달합니다. ItemWriter 는 전달 받은 Item 들을 출력하거나 저장한 후 다시 ChunkProcessor 객체를 통해 Item 을 읽어오는 작업을 하게 된다.

ChunkOrientedTasklet 은 이러한 반복적인 작업은 읽을 Item 이 없을때까지 Chunk 단위로 반복합니다.

ChunkOrientedTasklet 소스코드

... 생략

// chunkProcessor 객체
private final ChunkProcessor<I> chunkProcessor;
// chunkProvider 객체
private final ChunkProvider<I> chunkProvider;

@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

@SuppressWarnings("unchecked")
// Chunk 처리 중 예외가 발생해 재 시도를 할 경우 다시 데이터를 읽지 않고 버퍼에 담아 놓았던 데이터를 가져옵니다.
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
if (inputs == null) {
// ChunkProvider 객체를 이용해 Item 을 Chunk Size 만큼 반복해서 읽은 다음 Chunk<I> 객체에 저장해 반환합니다.
inputs = chunkProvider.provide(contribution);
if (buffering) {
chunkContext.setAttribute(INPUTS_KEY, inputs);
}
}

// ChunkProcessor 를 이용해 ChunkProvider 로 부터 읽은 Chunk<I> 의 아이템 개수만큼 데이터를 가공하고 저장합니다.
chunkProcessor.process(contribution, inputs);
chunkProvider.postProcess(contribution, inputs);

// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {
logger.debug("Inputs still busy");
return RepeatStatus.CONTINUABLE;
}

// Chunk 단위 입출력이 완료되면 버퍼에 저장한 Chunk 데이터를 삭제합니다.
chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();

if (logger.isDebugEnabled()) {
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
}

// 읽을 Item 이 더 존재하는지 체크합니다.
// 읽을 Item 이 더 있을 경우 Chunk 프로세스를 반복하고
// 읽을 Item 이 없을 경우 RepeatStatus.FINISHED 를 반환해 Chunk 프로세스를 종료합니다.
return RepeatStatus.continueIf(!inputs.isEnd());
}

... 생략

ChunkOrientedTasklet 장점

ChunkOrientedTasklet은 Chunk 지향 처리에서 처리할 수 있는 모든 항목을 읽고 처리하고 나서야 다음 스텝으로 진행합니다. 따라서 ChunkOrientedTasklet은 대용량 데이터 세트를 처리하는 데 매우 유용합니다. 메모리 부족 문제를 방지하기 위해 ChunkOrientedTasklet은 항목을 조각 단위로 처리하며, 처리할 각 조각의 크기는 설정할 수 있습니다.

Spring Batch에서 ChunkOrientedTasklet은 스텝 처리를 수행하는 방법 중 하나이며, 일괄 처리 작업을 조각 단위로 처리할 수 있도록 해줍니다. 이를 통해 대용량 데이터 세트를 처리할 때 메모리 부족 문제를 방지하고 처리 속도를 향상시킬 수 있습니다.

Share