목자
스프링 부트 카프카 사용하기
참고
https://www.baeldung.com/spring-kafka
목자
의존성 추가
implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test'
|
카프카에 접속하기 위한 설정
kafka: bootstrapAddress: localhost:9092 message: topic: test consumer: group-id: foo
|
Topic 설정
KafkaAdmin 는 Kafka 에 해당 Topic 이 없을 경우 자동으로 Topic 을 생성해준다.
@Configuration public class KafkaTopicConfig { @Value("${kafka.bootstrapAddress}") private String bootstrapAddress;
@Bean public KafkaAdmin kafkaAdmin(){ Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); }
@Bean public NewTopic topic(){ return new NewTopic("test", 1, (short) 1); } }
|
Producer 설정
ProducerConfig
를 이용해 Producer 를 사용하기 위해 필요한 값들을 설정해준다.
- BOOTSTRAP_SERVERS_CONFIG
- 연결하기 위한 Kafka Server 주소 정보를 설정
- KEY_SERIALIZER_CLASS_CONFIG
- 카프카로 Key 값을 보내기 위해 직렬화 하기 위한 정보를 설정
- VALUE_SERIALIZER_CLASS_CONFIG
- 카프카로 Value 값을 보내기 위해 직렬화 하기 위한 정보를 설정
- MAX_REQUEST_SIZE_CONFIG
- Producer 가 카프카로 보낼 수 있는 Message 최대 크기를 설정
@Configuration public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress;
@Bean public ProducerFactory<String, String> producerFactory(){ Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); } }
|
Consummer 설정
ConsumerConfig
를 이용해 Consumer 를 사용하기 위해 필요한 값들을 설정해준다.
- BOOTSTRAP_SERVERS_CONFIG
- 연결하기 위한 Kafka Server 주소를 설정
- KEY_DESERIALIZER_CLASS_CONFIG
- 카프카에서 전달 받은 Key 값을 역질렬화 하기 위한 정보를 설정
- VALUE_DESERIALIZER_CLASS_CONFIG
- 카프카에서 전달 받은 Value 값을 역질렬화 하기 위한 정보를 설정
- GROUP_ID_CONFIG
- 카프카 Consumer Group 정보를 설정
@Configuration @EnableKafka public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress;
@Value(value = "${kafka.consumer.group-id}") private String consumerGroupId;
@Bean public ConsumerFactory<String, String> consumerFactory(){ Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return new DefaultKafkaConsumerFactory<>(configProps); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());
return factory; } }
|
Producer
@Service @RequiredArgsConstructor @Slf4j public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate; @Value(value = "${kafka.message.topic}") private String topicName;
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<>() { @Override public void onSuccess(SendResult<String, String> result) { log.info("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata() .offset() + "]"); }
@Override public void onFailure(Throwable ex) { log.info("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); } }
|
Consumer
@Service public class KafkaConsumer {
private CountDownLatch latch = new CountDownLatch(3);
@KafkaListener(topics = "${kafka.message.topic}", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group 'foo': " + message); latch.countDown(); } }
|
Controller
@RestController @RequestMapping("/kafka") @RequiredArgsConstructor @Slf4j public class KafkaController {
private final KafkaProducer kafkaProducer;
@PostMapping public String sendMessage(@RequestParam("message") String message) { this.kafkaProducer.sendMessage(message); log.info("Request Success = {}", message); return "success"; } }
|