@Service @Slf4j @RequiredArgsConstructor public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList( new Field("String", true, "order_id") , new Field("String", true, "user_id") , new Field("String", true, "product_id") , new Field("int32", true, "qty") , new Field("int32", true, "unit_price") , new Field("int32", true, "total_price") );
Scheme scheme = Scheme.builder() .type("struct") .fields(fields) .optional(false) .name("orders") .build();
public OrderDto send(String topic, OrderDto orderDto){ Payload payload = Payload.builder() .order_id(orderDto.getOrderId()) .user_id(orderDto.getUserId()) .product_id(orderDto.getProductId()) .qty(orderDto.getQty()) .unit_price(orderDto.getUnitPrice()) .total_price(orderDto.getTotalPrice()) .build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(scheme, payload);
ObjectMapper mapper = new ObjectMapper(); String jsonInString = "";
try{ jsonInString = mapper.writeValueAsString(orderDto); }catch (JsonProcessingException ex){ ex.printStackTrace(); }
kafkaTemplate.send(topic, jsonInString); log.info("Kafka Producer sent data from the Order microservice: " + orderDto);
return orderDto; } }
|