Spring Batch - ChunkProvider 와 ChunkProcessor

출처

목차

ChunkProvider

ChunkProvider는 ItemReader에서 읽어온 데이터를 Chunk 단위로 분할하여 ItemProcessor 및 ItemWriter에 전달하는 객체

ChunkProvider는 스텝을 실행할 때 Chunk<I> 객체를 만들고 Chunk의 크기를 지정합니다. 예를 들어, Chunk의 크기를 10으로 설정하면, ItemReader에서 10개의 데이터를 읽어와서 ItemProcessor 와 ItemWriter에 전달합니다. 이러한 과정을 Chunk 단위로 반복하여 모든 데이터를 처리합니다.

ChunkProvider는 ItemReader에서 반환된 모든 데이터를 저장하지 않고, Chunk 단위로 처리하기 때문에 대용량 데이터를 처리하는 데 매우 효율적입니다. 이를 통해 메모리 사용량을 최소화하고, 처리 속도를 향상시킬 수 있습니다.

Spring Batch에서는 기본적으로 SimpleChunkProvider를 제공합니다. SimpleChunkProvider는 Chunk의 크기를 지정하고, ItemReader에서 읽어온 데이터를 Chunk 단위로 분할하여 ItemProcessor 및 ItemWriter에 전달합니다. 또한, SimpleChunkProvider는 실패 시 롤백을 수행할 수 있도록 구성할 수 있습니다.

ChunkProvider는 Chunk 기반 스텝의 핵심 역할을 수행하므로, ChunkProvider의 구현 방식은 대용량 데이터 처리 성능에 큰 영향을 미칩니다. 최적의 ChunkProvider 구현 방식을 선택하여 대용량 데이터 처리를 효율적으로 수행할 수 있습니다.

ChunkProvider 인터페이스

public interface ChunkProvider<T> {

Chunk<T> provide(StepContribution contribution) throws Exception;

void postProcess(StepContribution contribution, Chunk<T> chunk);

}

SimpleChunkProvider

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

// Chunk 객체를 생성한다.
final Chunk<I> inputs = new Chunk<>();
// Chunk Size 만큼 repeatOperations 를 이용해 read() 를 호출한다.
repeatOperations.iterate(new RepeatCallback() {

@Override
public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
I item = null;
Timer.Sample sample = Timer.start(Metrics.globalRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
// ItemReader 를 이용해 Item 을 하나씩 읽는다.
item = read(contribution, inputs);
}
catch (SkipOverflowException e) {
// read() tells us about an excess of skips by throwing an
// exception
status = BatchMetrics.STATUS_FAILURE;
return RepeatStatus.FINISHED;
}
finally {
stopTimer(sample, contribution.getStepExecution(), status);
}

// 더 이상 읽을 Item 이 없을 경우 반복문 종료 및 전체 Chunk 프로세스 종료
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}

// Chunk 에 ItemReader 로부터 읽은 Item 을 Chunk Size 만큼 저장한다
inputs.add(item);
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
}

});

return inputs;

}

ChunkProcessor

ChunkProcessor 는 Chunk 단위로 ItemProcessor 를 사용해 Item 을 변형, 가공, 필터링하고 ItemWriter 를 사용해 Chunk 데이터를 저장 및 출력한 후 스텝의 실행 결과를 반환하는 역할을 수행합니다.

Chunk<O> 객체를 만들고 ChunkProvider 로부터 전달 받은 Chunk<I> 의 Item 을 한 건씩 처리한 후에 Chunk<O> 에 저장합니다.

ChunkProcessor 는 ChunkProvider 에서 전달받은 Chunk 를 이용하여 ItemProcessor 와 ItemWriter 를 실행하고, 스텝의 실행 결과를 반환하는 역할을 수행합니다. ChunkProcessor 는 각 Chunk에서 ItemProcessor 를 이용하여 데이터를 처리하고, 처리 결과를 ItemWriter 를 통해 출력합니다. 처리 결과는 Chunk 단위로 모아서 스텝의 실행 결과를 결정합니다.

Spring Batch에서는 기본적으로 SimpleChunkProcessor 를 제공합니다. SimpleChunkProcessor 는 ChunkProvider 에서 Chunk 단위로 전달받은 데이터를 ItemProcessor 및 ItemWriter 에 전달하여 처리하고, 스텝의 실행 결과를 반환합니다. 또한, SimpleChunkProcessor 는 실패 시 롤백을 수행할 수 있도록 구성할 수 있습니다.

ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료되고 Step 반복문에서 ChunkOrientedTasklet 이 새롭게 실행됩니다. ItemWriter 는 Chunk 사이즈만큼 데이터를 Commit 하기 때문에 Chunk 사이즈는 곧 Commit Interval 이 됩니다.

ChunkProcessor는 대용량 데이터 처리 성능에 큰 영향을 미치므로, 최적의 ChunkProcessor 구현 방식을 선택하여 대용량 데이터 처리를 효율적으로 수행할 수 있습니다.

public interface ChunkProcessor<I> {

void process(StepContribution contribution, Chunk<I> chunk) throws Exception;
}

SimpleChunkProcessor

@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {

// Allow temporary state to be stored in the user data field
initializeUserData(inputs);

// If there is no input we don't have to do anything more
if (isComplete(inputs)) {
return;
}

// ItemProcessor 객체를 이용해 Item 을 처리한 후 Chunk<O> 로 반환한다.
Chunk<O> outputs = transform(contribution, inputs);

// ItemProcessor 객체를 이용해 Item 을 Filtering 된 Item 들을 저장한다.
contribution.incrementFilterCount(getFilterCount(inputs, outputs));

// ItemWriter 객체를 이용해 Item 들을 저잫아거나 출력한다.
write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

}
Share