Spring - Kafka Java Config 사용하기

목자

스프링 부트 카프카 사용하기

참고

https://www.baeldung.com/spring-kafka

목자

의존성 추가

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

카프카에 접속하기 위한 설정

kafka:
bootstrapAddress: localhost:9092
message:
topic: test
consumer:
group-id: foo

Topic 설정

KafkaAdmin 는 Kafka 에 해당 Topic 이 없을 경우 자동으로 Topic 을 생성해준다.

@Configuration
public class KafkaTopicConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public KafkaAdmin kafkaAdmin(){
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic(){
// 1. Topic 명, 2. Partition 개수, 3. Replication 수
return new NewTopic("test", 1, (short) 1);
}
}

Producer 설정

ProducerConfig 를 이용해 Producer 를 사용하기 위해 필요한 값들을 설정해준다.

  • BOOTSTRAP_SERVERS_CONFIG
    • 연결하기 위한 Kafka Server 주소 정보를 설정
  • KEY_SERIALIZER_CLASS_CONFIG
    • 카프카로 Key 값을 보내기 위해 직렬화 하기 위한 정보를 설정
  • VALUE_SERIALIZER_CLASS_CONFIG
    • 카프카로 Value 값을 보내기 위해 직렬화 하기 위한 정보를 설정
  • MAX_REQUEST_SIZE_CONFIG
    • Producer 가 카프카로 보낼 수 있는 Message 최대 크기를 설정
@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}

Consummer 설정

ConsumerConfig 를 이용해 Consumer 를 사용하기 위해 필요한 값들을 설정해준다.

  • BOOTSTRAP_SERVERS_CONFIG
    • 연결하기 위한 Kafka Server 주소를 설정
  • KEY_DESERIALIZER_CLASS_CONFIG
    • 카프카에서 전달 받은 Key 값을 역질렬화 하기 위한 정보를 설정
  • VALUE_DESERIALIZER_CLASS_CONFIG
    • 카프카에서 전달 받은 Value 값을 역질렬화 하기 위한 정보를 설정
  • GROUP_ID_CONFIG
    • 카프카 Consumer Group 정보를 설정
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Value(value = "${kafka.consumer.group-id}")
private String consumerGroupId;


@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

return factory;
}
}

Producer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

@Value(value = "${kafka.message.topic}")
private String topicName;


public void sendMessage(String message) {

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);

future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata()
.offset() + "]");
}

@Override
public void onFailure(Throwable ex) {
log.info("Unable to send message=[" + message + "] due to : " + ex.getMessage());
}
});
}
}

Consumer

@Service
public class KafkaConsumer {

private CountDownLatch latch = new CountDownLatch(3);

@KafkaListener(topics = "${kafka.message.topic}", groupId = "foo")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
latch.countDown();
}
}

Controller

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
@Slf4j
public class KafkaController {

private final KafkaProducer kafkaProducer;

@PostMapping
public String sendMessage(@RequestParam("message") String message) {
this.kafkaProducer.sendMessage(message);
log.info("Request Success = {}", message);
return "success";
}
}
Share