Spring Batch - Chunk 기반 Step

출처

목차

Chunk

Chunk 란 여러개의 아이템을 묶은 하나의 덩어리, 블록을 의미합니다.

한번에 하나씩 아이템 입력을 받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리합니다. 즉, Chunk 단위의 Commit 과 Rollback 이 이루어지게 됩니다.

일반적으로 대용량의 데이터를 한번에 처리하는 것이 아닌 Chunk 단위로 쪼개어서 더 이상 처리할 데이터가 없을 때까지 반복해서 입출력하는데 사용됩니다.

Chunk는 대용량 데이터를 처리하는 데 사용되는 방법 중 하나입니다. Chunk 지향 처리는 일괄 처리 작업을 조각 단위로 분할하여 처리하며, 각 조각은 스텝 내에서 개별적으로 처리됩니다. 예를 들어, 1000개의 항목이있는 데이터 세트가 있다면, Chunk 처리를 사용하여 이 데이터를 100개씩 10 조각으로 분할 할 수 있습니다.

Chunk는 큰 데이터 세트를 처리 할 때 매우 유용합니다. 데이터를 조각으로 나누어 처리하므로 메모리 부족 문제가 발생하지 않습니다. 대신 Chunk는 데이터를 작은 묶음으로 나누어 처리하고 일부 묶음이 완료되면 다음 묶음으로 이동합니다.

Spring Batch에서 Chunk는 ItemReader, ItemProcessor 및 ItemWriter로 구성됩니다. ItemReader는 데이터를 읽고 ItemProcessor는 데이터를 가공하고 ItemWriter는 결과를 저장합니다. Chunk 지향 처리를 사용하면 각 스텝에서 처리 할 데이터 레코드의 수를 지정할 수 있으므로 처리 속도를 향상시킬 수 있습니다.

Chunk 입력기와 출력기

입력기를 통해 Item 을 하나하나씩 읽어들여 Chunk 에 저장한 후 처리가 완료 됐을때 Item 들을 한번에 출력기로 내보낸다.

Chunk<I> 와 Chunk<O>

Chunk<I> 는 ItemReader 로 읽은 하나의 아이템을 Chunk 에서 정한 개수만큼 반복해서 저장하는 타입입니다.

Chunk<O> 는 ItemReader 로부터 전달받은 Chunk<I> 를 참조해서 ItemProcessor 에서 적절하게 가공 및 핕러링한 후 ItemWriter 에 전달하는 타입입니다.

Chunk 기반 Batch 에서의 Process

Chunk 아키텍처

ItemReader 가 Source 로부터 Item 을 읽은 후 Chunk<I> 에 저장합니다. 설정된 Chunk Size 만큼 Item 을 읽었을 후에는 ItemProcessor 에 Chunk<I> 에 저장된 Item 들을 inputs 객체로 전달합니다.

ItemProcessor 는 Iterator 를 이용해 전달받은 inputs 에 저장된 Item 들에 대한 데이터 처리를 한 후 Chunk<O> 에 처리 결과를 전달합니다.

Chunk<O> 은 ItemWriter 에 처리 결과를 전달합니다. ItemWriter 객체는 데이터를 쓰거나 저장하는 역할을 진행합니다.

Chunk 기반 Batch 에서의 Transaction 단위

Chunk 예시 코드

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(2)
// ItemReader
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3","item4", "item5", "item6")))
// ItemProcessor
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
Thread.sleep(300);
System.out.println(item);
return "my_" + item;
}
})
// ItemWriter
.writer(new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
Thread.sleep(1000);
System.out.println(items);
}
})
.build();
}

Lambda 로 표현

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3","item4", "item5", "item6")))
.processor((ItemProcessor<String, String>) item -> {
Thread.sleep(300);
System.out.println(item);
return "my_" + item;
})
.writer(items -> {
Thread.sleep(1000);
System.out.println(items);
})
.build();
}
Share