Websocket 개념, Spring Webflux, 예제
Websocket 프로토콜
Websocket 프로토콜은 Application, Presentation, Session, Transport 계층에 걸쳐서 기술이 이뤄져있습니다. 4개의 계층에 골고루 Websocket 통신에 필요한 기술을 사용하고 있습니다. Websocket 은 양방향 통신이 가능하며, HTTP 와 다르게 지속적으로 연결을 유지하고 있을 경우에는 오버헤드가 적습니다.
Websocket Connection 획득 과정
웹소켓 엔드포인트에 요청
GET ws://localhost:8080/chat
웹소켓 커넥션 응답, 요청 수립
정상적인 요청일 경우 서버로부터 아래와 같이 응답이 옵니다.
Handshake status: 101 (Switching Protocols)
upgrade: websocket
connection: upgrade
서버에서 요청을 수립했을 때 응답으로 아래와 같이 upgrade 헤더가 내려온다는 것을 기억하시기 바랍니다.
- upgrade: websocket
- connection: upgrade
HandlerMapping, HandlerAdapter, SimpleUrlHandlerMapping
Websocket Request 매핑 시 HandlerAdapter와 HandlerAdapter 는 무엇을 사용하는지, HandlerMapping 은 어떤 것이 사용되는지, 실제 매핑되는 WebsocketHandler는 무엇인지를 그림으로 그려보면 아래와 같습니다. 아래 그림에서 검은색 배경과 노란 글씨로 표현한 부분들이 Websocket 요청/응답을위한 HandlerMappingAdapter, HandlerMapping, Handler 들 입니다.
SimpleUrlHandlerMapping (opens in a new tab), SimpeUrlHandlerMapping(code) (opens in a new tab)
WebSocketSession
WebSocketSession (opens in a new tab) , WebSocketSession(code) (opens in a new tab)
package org.springframework.web.reactive.socket;
// ...
public interface WebSocketSession {
String getId();
HandshakeInfo getHandshakeInfo();
DataBufferFactory bufferFactory();
Map<String, Object> getAttributes();
Flux<WebSocketMessage> receive();
Mono<Void> send(Publisher<WebSocketMessage> messages);
boolean isOpen();
default Mono<Void> close() {
return close(CloseStatus.NORMAL);
}
Mono<Void> close(CloseStatus status);
Mono<CloseStatus> closeStatus();
WebSocketMessage textMessage(String payload);
WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory);
}
bufferFactory
- DataBufferFactory 로 변환 후 DataBuffer 로 변환할 수 있도록 제공되는 필드
getAttributes
- attribute 들을 얻어오는 메서드
receive
- 클라이언트로부터 Flux 타입으로 WebSocketMessage 를 받는 메서드입니다.
send
- WebSocketMessagePublisher 를 이용해서 WebSocketMessage를 전달할 때 사용하는 메서드입니다.
isOpen
- WebSocketSession 이 열려있는지 체크합니다.
close
- close() : WebSocket 을 NORMAL 상태로 닫을 때 사용하는 메서드입니다.
- close(CloseStatus satus) :
- WebSocket 을 전달받은 status 를 지정해서 닫는 용도의 메서드입니다.
WebSocketSession 내의 factory 메서드
AbstractWebSocketSession (opens in a new tab) , AbstractWebSocketSession.java (code) (opens in a new tab)
public abstract class AbstractWebSocketSession<T> implements WebSocketSession {
// ...
@Override
public WebSocketMessage textMessage(String payload) {
byte[] bytes = payload.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = bufferFactory().wrap(bytes);
return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer);
}
@Override
public WebSocketMessage binaryMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload);
}
@Override
public WebSocketMessage pingMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.PING, payload);
}
@Override
public WebSocketMessage pongMessage(Function<DataBufferFactory, DataBuffer> payloadFactory) {
DataBuffer payload = payloadFactory.apply(bufferFactory());
return new WebSocketMessage(WebSocketMessage.Type.PONG, payload);
}
}
textMessage(String payload)
- 주어진 payload 를 bufferFactory 를 이용해서 WebSocketMessage 로 변환합니다.
binaryMessage(Function<DataBufferFactory, DataBuffer>)
- 인자값으로 전달받은 Function 을 통해서 변환된 DataBuffer 를 WebSocketMessage 로 변경해줍니다.
pingMessage(Function<DataBufferFactory, DataBuffer>), pongMessage(Function<DataBufferFactory, DataBuffer>)
- 인자값으로 전달받은 Function 을 통해서 변환된 DataBuffer 를 WebSocketMessage 로 변환해줍니다.
- 흔히 알려진 PING/PONG 메시지 교환입니다. 서버 또는 클라이언트가 pingMessage 를 전달하면 반대쪽에서는 pongMessage를 반환해야 합니다.
WebSocketMessage
WebSocketMessage (opens in a new tab), WebSocketMessage.java (code) (opens in a new tab)
package org.springframework.web.reactive.socket;
// ...
public class WebSocketMessage {
private static final boolean reactorNetty2Present = ClassUtils.isPresent(
"io.netty5.handler.codec.http.websocketx.WebSocketFrame", WebSocketMessage.class.getClassLoader());
private final Type type;
private final DataBuffer payload;
// ...
public String getPayloadAsText() {
return getPayloadAsText(StandardCharsets.UTF_8);
}
public String getPayloadAsText(Charset charset) {
return this.payload.toString(charset);
}
/**
* WebSocket message types.
*/
public enum Type {
/**
* Text WebSocket message.
*/
TEXT,
/**
* Binary WebSocket message.
*/
BINARY,
/**
* WebSocket ping.
*/
PING,
/**
* WebSocket pong.
*/
PONG
}
// ...
}
- type : WebSocketMessage 내에 정의된 enum 인
Type
을 의미하며, TEXT, BINARY, PING, PONG 이 있습니다. - payload : socketMassage 에 담긴 payload 입니다.
- getPayloadAsText : payload 를 String 형태로 변환해줍니다.
CloseStatus
CloseStatus (opens in a new tab) , CloseStatus (code) (opens in a new tab)
package org.springframework.web.socket;
// ...
public final class CloseStatus implements Serializable {
private static final long serialVersionUID = 5199057709285570947L;
public static final CloseStatus NORMAL = new CloseStatus(1000);
public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
// 10004: Reserved.
// The specific meaning might be defined in the future.
public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
public static final CloseStatus BAD_DATA = new CloseStatus(1007);
public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);
public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);
public static final CloseStatus SESSION_NOT_RELIABLE =
new CloseStatus(4500).withReason("Failed to send message within the configured send limit");
private final int code;
@Nullable
private final String reason;
public CloseStatus(int code) {
this(code, null);
}
// ...
}
이 중 주요 필드를 요약해보면 아래와 같습니다. 아래 필드 외의 다른 필드들은 CloseStatus (code) (opens in a new tab) 내의 주석을 참고해주시기 바랍니다.
- 1000 : NORMAL
- 정상종료를 의미합니다.
- 1001 :
GOING_AWAY
- 서버가 예상치 못하게 종료되거나 페이지에서 벗어난 경우를 의미합니다.
- 1002 :
PROTOCOL_ERROR
- 프로토콜에 문제를 의미하니다.
- 1003 :
NOT_ACCEPTABLE
- ACCEPT 불가능한 데이터를 요청으로 받았을 때 보
- 1011 :
SERVER_ERROR
- 예상하지 못한 에러. 서버에서 요청을 처리하지 못하는 경우
- 1012 :
SERVICE_RESTARTED
- 서비스가 재시작됨을 의미
- 이 때 클라이언트는 5~30 초 내에 랜덤하게 접근합니다.
e.g.
먼저 코드부터 남기고 이 글의 하단부에 각 컴포넌트 등의 코드에 대한 설명을 남기도록 하겠습니다.
예제 시나리오
사용자 a
가 아래와 같이 접속
WEBSOCKET ws://localhost:8080/chatting
X-USER-NAME: a
사용자 b
가 아래와 같이 접속
WEBSOCKET ws://localhost:8080/chatting
X-USER-NAME: b
사용자 a의 채팅 진행
- 메시지 앞에
b:
를 붙여서 메시지를 보냅니다. [받을 사람]:
과 같은 형식입니다.
Handshake status: 101 (Switching Protocols)
set-cookie: SESSION=7873bcfa-fe61-40d9-8ce0-8a233c50883a; Path=/; HTTPOnly; SameSite=Lax
upgrade: websocket
connection: upgrade
sec-websocket-accept: 7Xs+mSGpH81i7eaY84W8Bd4QHsI=
(System): a님이 채팅방에 입장하셨습니다.
b: 안녕하세요 b님
b: 안녕하세요 a님
b: 잘지내시나요
b: 네 잘 지내고 있어요
b: 그렇군요
b: 아주 좋습니다.
Connection closed
사용자 b 의 채팅 진행
- 메시지 앞에
b:
를 붙여서 메시지를 보냅니다. [받을 사람]:
과 같은 형식입니다.
WEBSOCKET ws://localhost:8080/chatting
Handshake status: 101 (Switching Protocols)
set-cookie: SESSION=95629724-a649-4efe-934d-90c8db45a28d; Path=/; HTTPOnly; SameSite=Lax
upgrade: websocket
connection: upgrade
sec-websocket-accept: IEz5SEgtKBJFMsmO1znelHu3ia4=
(System): b님이 채팅방에 입장하셨습니다.
a: 안녕하세요 b님
b: 안녕하세요 a님
b: 안녕하세요 a님
b: 안녕하세요 a님
b: 안녕하세요 a님
a: 안녕하세요 a님
a: 잘지내시나요
a: 네 잘 지내고 있어요
a: 그렇군요
a: 아주 좋습니다.
Connection closed
WebSocketConfig
먼저 WebsocketConfig 예제입니다.
WebSocketHandlerAdapter 를 Bean 으로 등록합니다. 이 WebSocketHandlerAdapter 는 CustomWebSocketService 를 내부에 바인딩하고 있습니다. CustomWebSocketService 는 HandshakeWebSocketService (opens in a new tab) 를 extends 한 사용자 정의 클래스입니다.
위에서 이미 설명했듯 HandlerAdapter 는 HandlerMapping 을 찾은 후 찾은 HandlerMapping 을 어떻게 처리해야 할지를 정의해야 하는데, 이번 WebSocketHandlerAdapter 예제는 찾아낸 매핑에 맞는 CustomHandshakeWebsocketService (HandshakeWebSocketService) 타입을 연결해주고 있습니다.
찾아낸 HandlerMapping은 미리 정의해둔 빈인데, 여기에 대해서는 이 글을 계속 읽다보면 정리한 내용이 나옵니다.
package io.chagchagchag.example.foobar.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
@Configuration
public class WebsocketConfig {
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter(
HandshakeWebSocketService customHandshakeWebsocketService
){
// websocket 에서 attribute 를 바이패스
customHandshakeWebsocketService.setSessionAttributePredicate(s -> true);
return new WebSocketHandlerAdapter(customHandshakeWebsocketService);
}
}
CustomHandshakeWebSocketService
직접 정의한 CustomHandshakeWebSocketService 는 아래와 같습니다.
아래 코드는 단순히 헤더의 파라미터를 웹소켓 세션에 바인딩해주는 역할을 합니다.
사용자의 WEBSOCKET ws://localhost:8080/chatting
요청시에 헤더에 X-USER-NAME
이 있을 경우 이 X-USER-NAME
에 대한 값을 얻어온 후 세션에 user-name: X-USER-NAME 값
으로 바인딩해주는 역할을 합니다.
package io.chagchagchag.example.foobar.websocket.config;
import java.util.Optional;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Component
public class CustomHandshakeWebsocketService extends HandshakeWebSocketService {
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {
String userName = exchange
.getRequest()
.getHeaders()
.getFirst("X-USER-NAME");
return Optional.ofNullable(userName)
.map(_userName -> {
return exchange.getSession()
.flatMap(webSession -> {
webSession.getAttributes().put("user-name", _userName);
return super.handleRequest(exchange, handler);
});
})
.orElseGet(() -> {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
return exchange.getResponse().setComplete();
});
}
}
MappingConfig
SimpleUrlHandlerMapping (opens in a new tab) 빈을 생성해서 등록하는 역할을 합니다.
등록 시에 /chatting
에 대해 WebSocketHandler 인스턴스를 매핑해둔 Map 을 SimpleUrlHandlerMapping 의 urlMap 으로 등록해줍니다.
/chatting
:chattingWebsocketHandler
- 위의 key : value 를 기반으로 하는 Map 을 SimpleUrlHandlerMapping 내의 urlMap 에 등록
아래 코드에서 사용한 ChattingWebSocketHandler 는 WebSocketHandler 인터페이스를 implements 한 직접 정의한 사용자 정의 WebSocketHandler 입니다.
package io.chagchagchag.example.foobar.websocket.config;
import io.chagchagchag.example.foobar.websocket.handler.ChattingWebsocketHandler;
import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
@Configuration
public class MappingConfig {
@Bean
public SimpleUrlHandlerMapping simpleUrlHandlerMapping(
ChattingWebsocketHandler chattingWebsocketHandler
){
Map<String, WebSocketHandler> urlMapper = Map.of(
"/chatting", chattingWebsocketHandler
);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(1);
mapping.setUrlMap(urlMapper);
return mapping;
}
}
ChattingWebsocketHandler
WebSocketHandler 인터페이스를 implements 한 사용자 정의 WebSocketHandler 는 아래와 같습니다.
ChattingWebSocketHandler 는 WebSocketHandler interface 에서 제공하는 명세를 따라 handle()
메서드를 제공합니다.
package io.chagchagchag.example.foobar.websocket.handler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RequiredArgsConstructor
@Component
public class ChattingWebsocketHandler implements WebSocketHandler {
private final ChattingService chattingService;
@Override
public Mono<Void> handle(WebSocketSession session) {
String userName = (String) session.getAttributes().get("user-name");
Flux<Chat> chattingFlux = chattingService.register(userName);
chattingService.sendMessage(userName, new Chat(userName + "님이 채팅방에 입장하셨습니다.", "(System)"));
session
.receive()
.doOnNext(webSocketMessage -> {
String payload = webSocketMessage.getPayloadAsText();
String [] args = payload.split(":");
String to = args[0].trim();
String message = args[1].trim();
final boolean result = chattingService.sendMessage(to, new Chat(message, userName));
if(!result)
chattingService.sendMessage(userName, new Chat("대화상대가 없어요", "(System)"));
})
.subscribe();
return session.send(
chattingFlux.map(chat -> session.textMessage(chat.from() + ": " + chat.message()))
);
}
}
handle() 메서드에서는 의존성 주입받은 ChattingService 를 이용해서 아래의 두가지 작업을 합니다.
- register : 사용자 입장시
[user-name]
:Sinks.Many<Chat>
를 put 해서 업데이트 합니다. - sendMessage
- chattingService 내의 sendMessage() 함수를 호출합니다.
- Sinks.Many 의 tryEmitNext() 함수를 통해서 Sink 의 Next 연산 람다식내에 인자로 전달받은
Chat
데이터를 emit 해주게하는 식을 Sinks.Many 에 등록합니다.
ChattingService 내의 연산은 다음 섹션에서 설명합니다.
마지막으로 WebSocketSession 의 send 연산을 수행하는데, 이때 이전에 등록해둔 Sinks.Many 에 tryEmit 을 하는 람다식을 실행하는데, 이 플럭스에 대해서 [발신자]: [메시지]
의 형태의 문자열로 변환을 한 메시지를 전송합니다.
ChattingService
ChattingService 클래스는 Sinks 를 통해 트리거 이벤트를 발동시키는 역할을 하는 메서드 들을 정의해두었습니다.
package io.chagchagchag.example.foobar.websocket.handler;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
@Slf4j
@Service
public class ChattingService {
private static Map<String, Many<Chat>> chattingSinkMap = new ConcurrentHashMap<>();
public Flux<Chat> register(String userName){
Many<Chat> sink = Sinks.many().unicast().onBackpressureBuffer();
chattingSinkMap.put(userName, sink);
return sink.asFlux();
}
public boolean sendMessage(String userName, Chat chat){
log.info("userName : {}, chat {}", userName, chat);
return Optional.ofNullable(chattingSinkMap.get(userName))
.map(chatMany -> {
if(chatMany.currentSubscriberCount() == 0) return Boolean.FALSE;
chatMany.tryEmitNext(chat);
return Boolean.TRUE;
})
.orElseGet(()->Boolean.FALSE);
}
}
- register()
- 사용자 입장시
[user-name]
:Sinks.Many<Chat>
를 CuncurrentHashMap 에 put 해서 업데이트 합니다. - 가급적이면 실무에서 사용한다면, 캐시 자료구조를 사용하는 것이 추천됩니다.
- 사용자 입장시
- sendMessage()
- sendMessage() 함수에서는 Sinks.Many 의 tryEmitNext() 함수를 통해서 Sink 의 Next 연산 람다식내에 인자로 전달받은
Chat
데이터를 emit 해주게하는 식을 Sinks.Many 에 등록합니다.
- sendMessage() 함수에서는 Sinks.Many 의 tryEmitNext() 함수를 통해서 Sink 의 Next 연산 람다식내에 인자로 전달받은
Chat
데이터를 보내는 단위 데이터 클래스입니다.
package io.chagchagchag.example.foobar.websocket.handler;
public record Chat(
String message,
String from
) {
}
웹소켓 메시지 전송 방법
intellij 에서 제공하는 HTTP Client 기능을 활용합니다. 서버에서 complete() 를 보내지 않는 한 연결이 계속 이어져 있게 됩니다.