Spring Cloud - 33. Order Kafka Producer

목차

Order Kafka Producer

@RestController
@RequestMapping("/order-service/")
@RequiredArgsConstructor
public class OrderController {

private final OrderService orderService;
private final KafkaProducer kafkaProducer;
private final OrderProducer orderProducer;

@PostMapping(value = "/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
@RequestBody RequestOrder orderDetails){
ModelMapper modelMapper = new ModelMapper();
modelMapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

OrderDto orderDto = modelMapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);

orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
ResponseOrder returnValue = modelMapper.map(orderDto, ResponseOrder.class);

kafkaProducer.send("example-order-topic", orderDto);
orderProducer.send("orders", orderDto);

return ResponseEntity.status(HttpStatus.CREATED).body(returnValue);
}

@GetMapping(value = "/{userId}/orders")
public ResponseEntity<List<ResponseOrder>> getOrder(@PathVariable("userId") String userId){
Iterable<OrderEntity> orderList = orderService.getOrdersByUserId(userId);

List<ResponseOrder> result = new ArrayList<>();
orderList.forEach(v -> {
result.add(new ModelMapper().map(v, ResponseOrder.class));
});

return ResponseEntity.status(HttpStatus.OK).body(result);
}
}
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

public OrderDto send(String topic, OrderDto orderDto){
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";

try{
jsonInString = mapper.writeValueAsString(orderDto);
}catch (JsonProcessingException ex){
ex.printStackTrace();
}

kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);

return orderDto;
}
}
@Data
@Builder
public class Scheme {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Scheme scheme;
private Payload payload;
}
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderProducer {

private final KafkaTemplate<String, String> kafkaTemplate;

List<Field> fields = Arrays.asList(
new Field("String", true, "order_id")
, new Field("String", true, "user_id")
, new Field("String", true, "product_id")
, new Field("int32", true, "qty")
, new Field("int32", true, "unit_price")
, new Field("int32", true, "total_price")
);

Scheme scheme = Scheme.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();

public OrderDto send(String topic, OrderDto orderDto){
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();

KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(scheme, payload);

ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";

try{
jsonInString = mapper.writeValueAsString(orderDto);
}catch (JsonProcessingException ex){
ex.printStackTrace();
}

kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);

return orderDto;
}
}
Share