Spring Cloud - 31. Kafka Topic 적용

목차

Catalog Service

// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
implementation 'org.springframework.kafka:spring-kafka'
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> properties = new HashMap<>();

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(properties);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

return kafkaListenerContainerFactory;
}
}

Order Service

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class KafkaProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public OrderDto send(String kafkaTopic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try{
jsonInString = mapper.writeValueAsString(orderDto);
}catch (JsonProcessingException e){
e.printStackTrace();
}

kafkaTemplate.send(kafkaTopic, jsonInString);
log.info("Kafka Producer send data from the Order microservice: " + orderDto);

return orderDto;
}
}
Share