Spring boot Web Socket - Chat 프로그램 만들기 1 참고
Message Broker Message Broker 는 Publisher 로부터 전달 받은 Message 를 Subscriber 로 전달하는 중간 역할을 하며 응용 소프트웨어 간에 메시지를 교환할 수 있게 한다. 이때, 메시지가 적재되는 공간은 Message Queue 라고 하며 메시지 그룹을 Topic 이라고 한다.
Message Broker 는 데이터를 보내고 처리하고 삭제한다.
Message Interceptor 정의하기
ChannelInterceptor
는 Message 를 MessageChannel 로 보내기 전과 Message 가 MessageChannel 로 보낸 후 추가적일 로직을 수행할 수 있게 도와주는 Interface 다.
ChannelInterceptor 인터페이스를 구현해 Socket 통신 상태를 로그로 찍어 확인한다.
@Slf4j @Component @RequiredArgsConstructor public class StompHandler implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); if (StompCommand.CONNECT.equals(accessor.getCommand())){ log.info("CONNECT" ); }else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())){ log.info("SUBSCRIBE" ); }else if (StompCommand.DISCONNECT.equals(accessor.getCommand())){ log.info("DISCONNECT" ); } return message; } }
Web Socket 설정
STOMP 를 사용하기 위한 설정은 WebSocketMessageBrokerConfigurer
와 @EnableWebSocketMessageBroker
를 이용해 진행한다.
MessageBrokerRegistry
객체를 이용해 Message Broker 설정
setApplicationDestinationPrefixes 메소드를 이용해 Client 가 보내는 요청을 처리하기 위한 Prefix 를 설정한다.
enableSimpleBroker 메소드를 이용해 SimpleBroker 설정을 한다.
SimpleBroker 는 설정된 Prefix 를 subscribe 하는 Client 에게 메시지를 전달하는 역할을 한다.
StompEndpointRegistry
객체를 이용해 End Point 설정
addEndpoint 메소드를 이용해 STOMP 를 사용하기 위한 End Point 를 설정한다.
ChannelRegistration
객체를 이용해 Message Channel 에 Interceptor 를 추가한다.
@Configuration @EnableWebSocketMessageBroker @RequiredArgsConstructor public class WebsocketConfig implements WebSocketMessageBrokerConfigurer { private final StompHandler stompHandler; @Override public void configureMessageBroker (MessageBrokerRegistry brokerRegistry) { brokerRegistry.setApplicationDestinationPrefixes("/pub" ); brokerRegistry.enableSimpleBroker("/sub" ); } @Override public void registerStompEndpoints (StompEndpointRegistry registry) { registry.addEndpoint("/ws-stomp" ) .setAllowedOriginPatterns("*" ) .withSockJS(); } @Override public void configureClientInboundChannel (ChannelRegistration registration) { registration.interceptors(stompHandler); } }
Chat Room 관련 요청을 처리하기 위한 Controller @RequiredArgsConstructor @RestController @RequestMapping("/api/chat") public class ChatRoomController { private final ChatRoomRepository chatRoomRepository; @GetMapping("/room") public String rooms () { return "/chat/room" ; } @GetMapping("/rooms") @ResponseBody public List<ChatRoom> room () { List<ChatRoom> chatRooms = chatRoomRepository.findAllRoom(); chatRooms.stream() .forEach(room -> room.setUserCount(chatRoomRepository.getUserCount(room.getRoomId()))); return chatRooms; } @PostMapping("/room") @ResponseBody public ChatRoom createRoom (@RequestParam String name) { return chatRoomRepository.createChatRoom(name); } @GetMapping("/room/enter/{roomId}") public String roomDetail (Model model, @PathVariable String roomId) { model.addAttribute("roomId" , roomId); return "/chat/roomdetail" ; } @GetMapping("/room/{roomId}") @ResponseBody public ChatRoom roomInfo (@PathVariable String roomId) { return chatRoomRepository.findRoomById(roomId); } }
@Data @NoArgsConstructor @AllArgsConstructor @Builder public class ChatMessage { private Message Type; private String roomId; private String sender; private String message; private Long userCount; public enum Message { ENTER, QUIT, TALK; } }
@Data @NoArgsConstructor public class ChatRoom implements Serializable { private static final long serialVersionUID = 6494678977089006639L ; private String roomId; private String name; private long userCount; public static ChatRoom create (String name) { ChatRoom chatRoom = new ChatRoom (); chatRoom.roomId = UUID.randomUUID().toString(); chatRoom.name = name; return chatRoom; } }
Redis Pub/Sub 모델 사용하기 Message 를 처리하기 위한 Listenr Listner 에서는 전달 받은 Message 를 SimpMessageSendingOperations
를 이용해 현재 소켓에 연결돼 있는 사용자에게 메시지를 전파한다.
@Service @Slf4j @RequiredArgsConstructor public class RedisSubscriber { private final ObjectMapper objectMapper; private final SimpMessageSendingOperations messagingTemplate; public void sendMessage (String publishMessage) { try { ChatMessage chatMessage = objectMapper.readValue(publishMessage, ChatMessage.class); messagingTemplate.convertAndSend("/sub/api/chat/room/" + chatMessage.getRoomId(), chatMessage); } catch (Exception e) { log.error("Exception {}" , e); } } }
MessageListenerAdapter
객체를 이용해 Message 를 처리할 Listner 를 설정해준다.
RedisMessageListenerContainer
객체를 이용해 Topic 과 Listner 를 연결해준다.
Topic 에 Message 가 생성되면 MessageListner 가 해당 Message 를 처리한다.
Redis 설정 하기 @Configuration @EnableRedisRepositories public class RedisConfig { @Bean public ChannelTopic channelTopic () { return new ChannelTopic ("chatroom" ); } @Bean public MessageListenerAdapter messageListenerAdapter (RedisSubscriber redisSubscriber) { return new MessageListenerAdapter (redisSubscriber, "sendMessage" ); } @Bean public RedisMessageListenerContainer redisMessageListener (RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter, ChannelTopic channelTopic) { RedisMessageListenerContainer container = new RedisMessageListenerContainer (); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(messageListenerAdapter, channelTopic); return container; } @Bean public RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate <>(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer ()); redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer <>(String.class)); return redisTemplate; } }
Redis 에 저장 및 불러오기 위한 Repository 정의
HashOperations
Redis 에 Map 형태 (Key, Value) 로 데이터에 접근 및 저장을 하기 위한 객체
ValueOperations
Redis 에 String 데이터를 접근 및 저장을 위한 객체
@Repository @RequiredArgsConstructor public class ChatRoomRepository { private static final String CHAT_ROOM = "CHAT_ROOM" ; private static final String USER_COUNT = "USER_COUNT" ; private static final String ENTER_INFO = "ENTER_INFO" ; @Resource(name = "redisTemplate") private HashOperations<String, String, ChatRoom> hashOpsChatRoom; @Resource(name = "redisTemplate") private HashOperations<String, String, String> hashOpsEnterInfo; @Resource(name = "redisTemplate") private ValueOperations<String, String> valueOps; public List<ChatRoom> findAllRoom () { return hashOpsChatRoom.values(CHAT_ROOM); } public ChatRoom findRoomById (String id) { return hashOpsChatRoom.get(CHAT_ROOM, id); } public ChatRoom createChatRoom (String name) { ChatRoom chatRoom = ChatRoom.create(name); hashOpsChatRoom.put(CHAT_ROOM, chatRoom.getRoomId(), chatRoom); return chatRoom; } public void setUserEnterInfo (String userSessionId, String roomId) { hashOpsEnterInfo.put(ENTER_INFO, userSessionId, roomId); } public String getUserEnterRoomId (String userSessionId) { return hashOpsEnterInfo.get(ENTER_INFO, userSessionId); } public void removeUserEnterRoomId (String userSessionId) { hashOpsEnterInfo.delete(ENTER_INFO, userSessionId); } public Long getUserCount (String roomId) { return Long.valueOf(Optional.ofNullable(valueOps.get(USER_COUNT+ "_" + roomId)).orElse("0" )); } public Long plusUserCount (String roomId) { return Optional.ofNullable(valueOps.increment(USER_COUNT+"_" +roomId)).orElse(0L ); } public Long minusUserCount (String roomId) { return Optional.ofNullable(valueOps.decrement(USER_COUNT+"_" +roomId)).filter(count -> count > 0 ).orElse(0L ); } }
전달 받은 Message 를 Redis 로 보내기 @RequiredArgsConstructor @Service public class ChatService { private final ChannelTopic channelTopic; private final RedisTemplate redisTemplate; private final ChatRoomRepository chatRoomRepository; public String getRoomId (String destination) { int lastIndex = destination.lastIndexOf("/" ); if (lastIndex != -1 ) { return destination.substring(lastIndex + 1 ); } else { return "" ; } } public void sendChatMessage (ChatMessage message) { String roomId = getRoomId(message.getMessage()); Long userCount = chatRoomRepository.getUserCount(roomId); message.setUserCount(userCount); redisTemplate.convertAndSend(channelTopic.getTopic(), message); } }
Message 요청 처리하기
@MessageMapping
를 이용해 메시지 요청을 받기 위한 경로를 설정한다.
/api/chat/message
경로로 들어온 Message 를 처리한다.
@RestController @RequiredArgsConstructor @Slf4j public class ChatController { private final ChatService chatService; @MessageMapping("/api/chat/message") public void message (ChatMessage message) { chatService.sendChatMessage(message); } }