Spring - Embedded Kafka 사용하기

목자

스프링 부트 카프카 사용하기

참고

https://www.baeldung.com/spring-boot-kafka-testing

의존성

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

// https://mvnrepository.com/artifact/org.testcontainers/kafka
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.16.3'

Spring Properties

spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: baeldung
test:
topic: embedded-test-topic

Producer

@Component
public class KafkaProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}

Consumer

@Component
public class KafkaConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;

@KafkaListener(topics = "${test.topic}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}

public CountDownLatch getLatch() {
return latch;
}

public String getPayload() {
return payload;
}

private void setPayload(String payload) {
this.payload = payload;
}
}

EmbeddedKafka 를 이용한 Test 코드 작성

  • @EmbeddedKafka 어노테이션을 사용하면 테스트 실행시 필요한 EmbeddedKafkaBroker 를 설정할 수 있다.
    • partitions : EmbeddedKafka 내 Partition 개수를 설정한다.
    • brokerProperties : EmbeddedKafka Broker 설정을 한다.
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class EmbeddedKafkaApplicationTests {

@Autowired
public KafkaTemplate<String, String> template;

@Autowired
private KafkaConsumer consumer;

@Autowired
private KafkaProducer producer;

@Value("${test.topic}")
private String topic;

@Test
public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
template.send(topic, "Sending with default template");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(consumer.getLatch().getCount(), equalTo(0L));

assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}

@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with our own simple KafkaProducer");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

assertThat(consumer.getLatch().getCount(), equalTo(0L));
assertThat(consumer.getPayload(), containsString("embedded-test-topic"));
}
}

Test Container 를 이용한 Kafka Test 코드 작성

  • KafkaContainer 객체를 이용해 Test 시 사용할 Kafka Container 를 가져올 수 있다.
@RunWith(SpringRunner.class)
@Import(KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)
@SpringBootTest(classes = EmbeddedKafkaApplication.class)
@DirtiesContext
public class KafkaTestContainersLiveTest {

@ClassRule
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

@Autowired
public KafkaTemplate<String, String> template;

@Autowired
private KafkaConsumer consumer;

@Autowired
private KafkaProducer producer;

@Value("${test.topic}")
private String topic;

@Test
public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception {
template.send(topic, "Sending with default template");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

assertThat(consumer.getLatch().getCount(), CoreMatchers.equalTo(0L));
assertThat(consumer.getPayload(), CoreMatchers.containsString("embedded-test-topic"));
}

@Test
public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception {
producer.send(topic, "Sending with own controller");
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

assertThat(consumer.getLatch().getCount(), CoreMatchers.equalTo(0L));
assertThat(consumer.getPayload(), CoreMatchers.containsString("embedded-test-topic"));
}

@TestConfiguration
static class KafkaTestContainersConfiguration {

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}
}
Share