Spring - Kafka

스프링 부트 카프카 사용하기

목자

의존성 추가

implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

카프카에 접속하기 위한 설정

Consumer 설정

  • spring.kafka.consumer.bootstrap-servers
    • 카프카에 연결을 하기 위한 접속 주소
  • spring.kafka.consumer.group-id
    • kafka Consumer 그룹에 대한 정보
  • spring.kafka.consumer.auto-offset-reset
    • topic에 붙은 consumer의 offset 정보가 존재하지 않거나 오류가 발생해 offset을 사용할 수 없는 경우 처리하기 위한 옵션
    • latest : 가장 마지막 offset부터
    • earliest : 가장 처음 offset부터
    • none : offset 정보가 없으면 Exception 발생
  • spring.kafka.consumer.key-deserializer
    • 카프카에서 전달받은 key 를 역질렬화 할때 사용하는 역질렬화 클래스
    • StringDeserializer 는 문자열 데이터만 사용 가능한 역직렬화 도구
  • spring.kafka.consumer.value-deserializer
    • 카프카에서 전달받은 value 를 역질렬화 할때 사용하는 역질렬화 클래스
    • StringDeserializer 는 문자열 데이터만 사용 가능한 역직렬화 도구

Producer 설정

  • spring.kafka.producer.bootstrap-servers
    • 카프카에 연결을 하기 위한 접속 주소
  • spring.kafka.producer.key-serializer
    • Message 를 카프카 브로커에 전송할 때 사용하는 Serializer
    • 어떤 직렬화 클래스가 key 값을 바이트 배열로 변환했을 때 사용됐는지 알려준다.
    • StringSerializer 는 문자열 데이터만 사용 가능한 직렬화 도구
  • spring.kafka.producer.value-serializer
    • Message 를 카프카 브로커에 전송할 때 사용하는 Serializer
    • 어떤 직렬화 클래스가 value 값을 바이트 배열로 변환했을 때 사용됐는지 알려준다.
    • StringSerializer 는 문자열 데이터만 사용 가능한 직렬화 도구

application.yml

spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

Kafka Producer

  • Producer 에서는 KafkaTemplate 을 이용해 Kafka 내 특정 Topic 으로 메시지를 전송한다.

KafkaProducer.java

@Service
@RequiredArgsConstructor
public class KafkaProducer {
private static final String TOPIC = "exam";
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String message){
System.out.println(String.format("Produce message : %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}

KafkaController.java

@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducer producer;

@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}

@PostMapping
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);

return "success";
}
}

Kafka Consumer

  • @KafkaListener 어노테이션을 이용해 Consumer 설정을 해준다.
    • groupId : 해당 Consumer 가 속한 Consumer Group
    • topics : 해당 Consumer 가 구독하는 Topic

KafkaConsumer.java

@Service
public class KafkaConsumer {
@KafkaListener(topics = "exam", groupId = "foo")
public void consume(String message) throws IOException{

System.out.println(String.format("Consumed message : %s", message));
}
}
Share