목차
Spring Cloud - 33. Order Kafka Producer
Spring Cloud - 32. kafka-mariadb 연결
Spring Cloud - 31. Kafka Topic 적용
Catalog Service// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafkaimplementation 'org.springframework.kafka:spring-kafka'
@EnableKafka@Configurationpublic class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory(){ Map<String, Object> properties = new HashMap<>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(properties); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); return kafkaListenerContainerFactory; }}
Order Service@Configurationpublic class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory(){ Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); }}
@Service@Slf4j@RequiredArgsConstructorpublic class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; public OrderDto send(String kafkaTopic, OrderDto orderDto){ ObjectMapper mapper = new ObjectMapper(); String jsonInString = ""; try{ jsonInString = mapper.writeValueAsString(orderDto); }catch (JsonProcessingException e){ e.printStackTrace(); } kafkaTemplate.send(kafkaTopic, jsonInString); log.info("Kafka Producer send data from the Order microservice: " + orderDto); return orderDto; }}