Category: Kafka

0

Spring Cloud - 33. Order Kafka Producer

목차 Spring Cloud - 33. Order Kafka Producer Spring Cloud - 32. kafka-mariadb 연결 Spring Cloud - 31. Kafka Topic 적용 Order Kafka Producer@RestController@RequestMapping("/order-service/")@RequiredArgsConstructorpublic class OrderController { private final OrderService orderService; private final KafkaProducer kafkaProducer; private final OrderProducer orderProducer; @PostMapping(value = "/{userId}/orders") public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails){ ModelMapper modelMapper = new ModelMapper(); modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT); OrderDto orderDto = modelMapper.map(orderDetails, OrderDto.class); orderDto.setUserId(userId); orderDto.setOrderId(UUID.randomUUID().toString()); orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice()); ResponseOrder returnValue = modelMapper.map(orderDto, ResponseOrder.class); kafkaProducer.send("example-order-topic", orderDto); orderProducer.send("orders", orderDto); return ResponseEntity.status(HttpStatus.CREATED).body(returnValue); } @GetMapping(value = "/{userId}/orders") public ResponseEntity<List<ResponseOrder>> getOrder(@PathVariable("userId") String userId){ Iterable<OrderEntity> orderList = orderService.getOrdersByUserId(userId); List<ResponseOrder> result = new ArrayList<>(); orderList.forEach(v -> { result.add(new ModelMapper().map(v, ResponseOrder.class)); }); return ResponseEntity.status(HttpStatus.OK).body(result); }} @Service@Slf4j@RequiredArgsConstructorpublic class OrderProducer { private final KafkaTemplate<String, String> kafkaTemplate; public OrderDto send(String topic, OrderDto orderDto){ ObjectMapper mapper = new ObjectMapper(); String jsonInString = ""; try{ jsonInString = mapper.writeValueAsString(orderDto); }catch (JsonProcessingException ex){ ex.printStackTrace(); } kafkaTemplate.send(topic, jsonInString); log.info("Kafka Producer sent data from the Order microservice: " + orderDto); return orderDto; }} @Data@Builderpublic class Scheme { private String type; private List<Field> fields; private boolean optional; private String name;} @Data@AllArgsConstructorpublic class Field { private String type; private boolean optional; private String field;} @Data@Builderpublic class Payload { private String order_id; private String user_id; private String product_id; private int qty; private int unit_price; private int total_price;} @Data@AllArgsConstructorpublic class KafkaOrderDto implements Serializable { private Scheme scheme; private Payload payload;}

0

Spring Cloud - 32. kafka-mariadb 연결

목차 Spring Cloud - 33. Order Kafka Producer Spring Cloud - 32. kafka-mariadb 연결 Spring Cloud - 31. Kafka Topic 적용 Config// https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-clientimplementation 'org.mariadb.jdbc:mariadb-java-client' create table users( id int auto_increment primary key, user_id varchar(20), pwd varchar(20), name varchar(20), created_at datetime default NOW()); Kafka Connect 설치curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz tar xvf confluent-community-6.1.0.tar.gz

0

Spring Cloud - 31. Kafka Topic 적용

목차 Spring Cloud - 33. Order Kafka Producer Spring Cloud - 32. kafka-mariadb 연결 Spring Cloud - 31. Kafka Topic 적용 Catalog Service// https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafkaimplementation 'org.springframework.kafka:spring-kafka' @EnableKafka@Configurationpublic class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory(){ Map<String, Object> properties = new HashMap<>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(properties); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>(); kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); return kafkaListenerContainerFactory; }} Order Service@Configurationpublic class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory(){ Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); }} @Service@Slf4j@RequiredArgsConstructorpublic class KafkaProducer { private final KafkaTemplate<String, String> kafkaTemplate; public OrderDto send(String kafkaTopic, OrderDto orderDto){ ObjectMapper mapper = new ObjectMapper(); String jsonInString = ""; try{ jsonInString = mapper.writeValueAsString(orderDto); }catch (JsonProcessingException e){ e.printStackTrace(); } kafkaTemplate.send(kafkaTopic, jsonInString); log.info("Kafka Producer send data from the Order microservice: " + orderDto); return orderDto; }}