DispatcherHandler 와 Spring Webflux
우리는 Servlet 기반의 Spring 을 개발할 때에는 보통 DispatcherServlet의 개념과 HandlerMapping 이 어떻게 매핑되고 요청이 응답되는지를 스터디해왔습니다.
Project Reactor 또는 RxJava, Mutiny 등과 같은 Reactive Manifesto 를 따르는 Reactive Streams 계열의 브러리는 Reactor Netty 환경에서 동작가능합니다.
그리고 Reactor Netty 환경에서는 DispatcherServlet 이 아닌 DispatcherHandler
를 중심으로 Request 와 Response 의 상호작용을 해결합니다.
참고자료
DispatcherHandler 의 request,response 처리 흐름
DispatcherHandler 가 Resquest, Response 를 응답하기 위해 다른 객체들과 상호작용하는 그림을 그려보면 아래와 같습니다. DispatcherServlet 에서 보던 그림과 어느 정도는 유사하기에 친숙하게 느껴집니다.
0.
- 외부로부터 요청이 Netty 에 도착합니다.
- Netty 는 이 요청을 DispatcherHandler 에 전달합니다.
1.
- DispatcherHandler 는 요청의 파라미터 등을 파악해서 이 요청은 어떤 Handler 로 처리하면 맞는지 찾기 위해 HandlerMapping 을 조회합니다.
- 그리고 찾아낸 Handler 는 HandlerAdapter 리스트를 조회해서 어떤 HandlerAdapter가 맞는지를 찾습니다.
- 찾아낸 HandlerAdapter 를 통해 진입한 Handler 또는 Controller 에서는 Service, Repository 등의 비즈니스 로직을 수행합니다.
2.
- DispatcherHandler 는 HandlerAdapter 가 return 한 결과 값에 대해 알맞은 HandlerResult 객체를 HandlerResult 리스트에서 조회합니다.
3.
- DispatcherHandler 는 HandlerResult 를 종류에 맞게 적절한 처리를 할 수 있는 HandlerResultHandler 를 이용해서 사용자에게 응답을 해줍니다.
예를 들면 WebHandler 는 직접 작성한다면 아래와 같이 작성할 수 있습니다.
package io.chagchagchag.example.foobar.spring_webflux;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.ResponseCookie;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
@Slf4j
public class HttpHandlerExample1 {
@SneakyThrows
public static void main(String[] args) {
log.info("main function started");
var httpHandler = new HttpHandler(){
@Override
public Mono<Void> handle(
ServerHttpRequest request, ServerHttpResponse response
) {
String tickerQueryParam = request.getQueryParams().getFirst("ticker");
String ticker = tickerQueryParam == null ? "MSFT" : tickerQueryParam;
String content = "You picked " + ticker;
log.info("responseBody = {}", content);
Mono<DataBuffer> responseBody = Mono.just(
response.bufferFactory().wrap(content.getBytes())
);
response.addCookie(ResponseCookie.from("ticker", ticker).build());
response.getHeaders().add("Content-Type", "text/plain");
return response.writeWith(responseBody);
}
};
var adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer.create()
.host("localhost").port(8080)
.handle(adapter)
.bindNow()
.channel().closeFuture().sync();
log.info("main function end");
}
}
HttpHandler 는 HttpHandlerAdapter 에 주입 가능합니다. 그리고 HttpServer 는 HttpHandlerAdapter 를 주입받아서 처리할 수 있도록 로직을 작성합니다.
Functional Endpoint, Annotated Controller 의 동작
Spring Webflux 에서는 함수형 엔드포인트, 선언형 컨트롤러(Annotated Controller) 가 있습니다. 두 방식 모두 잘 쓰이는 방식입니다. 개인적으로는 선언형 컨트롤러 (Annotated Controller) 를 선호합니다.
Functional Endpoint 와 Annotated Controller 는 DispatcherServlet 내에서 아래와 같이 동작합니다.
함수형 엔드포인트를 사용할 경우에는 RouterFunction 을 사용한다는 점과 선언형 컨트롤러 (Annotated Controller) 를 사용할 때에는 @RequestMapping 을 사용한다는 점을 기억하면 이해가 쉽습니다.
예를 들어 함수형 엔드포인트는 아래와 같이 Router 를 작성해서 어떤 REST API 의 어떤 METHOD 를 처리할 지를 명시하고 이 Router 를 처리할 Handler 를 정의하는 방식으로 작성합니다.
e.g.
예제 코드는 [https://github.com/chagchagchag/stock-cells-kr/tree/main/backend/stock-cells-kr-backend/src/main/java/io/stock/evaluation/web/price/api (opens in a new tab)] 에서 확인 가능합니다.
PriceApiRouter.java
package io.stock.evaluation.web.price.api;
// ...
@Configuration
public class PriceApiRouter {
@Bean
public RouterFunction<ServerResponse> stockPriceByTickerRouter(PriceApiHandler priceApiHandler){
return RouterFunctions
.route().GET(
"/stock/price",
RequestPredicates.queryParam("ticker", v -> true),
priceApiHandler::getPriceBasicValuation
).build();
}
}
PriceApiHandler.java
package io.stock.evaluation.web.price.api;
import io.stock.evaluation.web.crawling.stock.price.application.CrawlingValuationService;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Component
public class PriceApiHandler {
private final CrawlingValuationService crawlingValuationService;
public PriceApiHandler(CrawlingValuationService crawlingValuationService){
this.crawlingValuationService = crawlingValuationService;
}
public Mono<ServerResponse> getPriceBasicValuation (ServerRequest serverRequest){
return serverRequest.queryParam("ticker")
.map(ticker -> {
return crawlingValuationService.getPriceBasicValuationData(ticker)
.flatMap(cdata -> ok()
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(cdata))
.switchIfEmpty(notFound().build())
);
})
.orElse(notFound().build());
}
}
ServerWebExchange
ServerWebExchange 는 ServerHttpRequest, ServerHttpResponse 와 같은 요청객체, 응답객체를 접근가능하도록 하는 메서드를 제공하는 interface 입니다. 그리고 mulitpartData, formData 등을 모두 접근할 수 있는 메서드 역시 추상화 되어 있는 interface 입니다. ServerWebExchange (opens in a new tab) 자체는 interface 이며 구현체로는 DefaultServerWebExchange (opens in a new tab), MockServerWebExchange (opens in a new tab), ServerWebExchangeDecorator (opens in a new tab) 가 있습니다.
Servlet 기반의 Spring 환경에서는 HttpServletRequest, HttpServletResponse 객체를 사용했었습니다. Reactor Netty 기반의 Spring 환경에서는 HttpServerRequest, HttpServerResponse 객체를 요청/응답 객체로 사용한다는 사실을 기억해주시기 바랍니다.
일반적으로 Router 나 Controller 메서드에서 ServerHttpRequest, ServerHttpResponse 를 개별적으로 주입받아서 사용 가능하지만 ServerWebExchange 자체를 주입받아서 사용하는 경우도 있습니다.
ServerWebExchange.java
public interface ServerWebExchange {
String LOG_ID_ATTRIBUTE = ServerWebExchange.class.getName() + ".LOG_ID";
ServerHttpRequest getRequest();
ServerHttpResponse getResponse();
Map<String, Object> getAttributes();
@SuppressWarnings("unchecked")
@Nullable
default <T> T getAttribute(String name) {
return (T) getAttributes().get(name);
}
@SuppressWarnings("unchecked")
default <T> T getRequiredAttribute(String name) {
T value = getAttribute(name);
Assert.notNull(value, () -> "Required attribute '" + name + "' is missing");
return value;
}
@SuppressWarnings("unchecked")
default <T> T getAttributeOrDefault(String name, T defaultValue) {
return (T) getAttributes().getOrDefault(name, defaultValue);
}
Mono<WebSession> getSession();
<T extends Principal> Mono<T> getPrincipal();
Mono<MultiValueMap<String, String>> getFormData();
Mono<MultiValueMap<String, Part>> getMultipartData();
default Mono<Void> cleanupMultipart() {
return getMultipartData()
.onErrorComplete() // ignore errors reading multipart data
.flatMapIterable(Map::values)
.flatMapIterable(Function.identity())
.flatMap(part -> part.delete().onErrorComplete())
.then();
}
LocaleContext getLocaleContext();
@Nullable
ApplicationContext getApplicationContext();
boolean isNotModified();
boolean checkNotModified(Instant lastModified);
boolean checkNotModified(String etag);
boolean checkNotModified(@Nullable String etag, Instant lastModified);
String transformUrl(String url);
void addUrlTransformer(Function<String, String> transformer);
String getLogPrefix();
default Builder mutate() {
return new DefaultServerWebExchangeBuilder(this);
}
interface Builder {
Builder request(Consumer<ServerHttpRequest.Builder> requestBuilderConsumer);
Builder request(ServerHttpRequest request);
Builder response(ServerHttpResponse response);
Builder principal(Mono<Principal> principalMono);
ServerWebExchange build();
}
}
주요 메서드들을 정리해보면 아래와 같습니다.
-
getRequest(), getResponse : ServerHttpRequest, ServerHttpResponse
-
getAttributes() : 요청 중 추가/변경 가능한 key/value 형태의 Map을 접근하는 함수.
-
getSession() : Session 정보를 담고 있는 WebSessions Publisher를 반환
-
getPrincipal() : Security 정보와 관련된 Principal Publisher를 반환
-
getFormData() : Content-Type 이 application/x-www-form-urlencoded 인 데이터에 대해 MultiValueMap 의 형태의 데이터를 제공하는 Publisher 를 반환
-
getMultipartData() : Content-Type 이 multipart/form-data 인 데이터에 대해 body 를 MultiValueMap 형태로 제공
-
getApplicationContext() : Spring 환경에서 구동된 경우 applicationContext 를 반환. Spring 환경에서 구동된 것이 아닐 경우에는 null 을 리턴
WebHandler
ServerWebExchange 단위로 요청을 받으며, WebHandler 내에서 ServerWebExchanbe 객체를 통해 request, response 에 접근 가능합니다.
WebHandler 는 외부로부터 요청이 왔을 때 Reactor Netty와 Dispatcher Handler 를 거쳐서 DispatcherHandler 가 HandlerAdapter 를 찾습니다. 이후 해당 HandlerAdapter의 handle() 메서드 호출시 WebHandlerAdapter 가 적절한 WebHandler를 찾은 후 WebHandler 의 handle() 메서드를 호출하는데, 여기에서 WebHandler의 handle() 메서드를 호출하기 전에 WebFilter의 filter() 가 호출됩니다. 그리고 익셉션 등이 발생할 경우에는 WebExceptionHandler 의 handle() 메서드를 호출하게 됩니다.
HttpHandler
HttpHandler 는 WebHandler 보다 row level 에서 동작하는 Handler 입니다. 그래서 httpHandler 내에서 조금더 상위 버전인 webHandler 를 바인딩해서 기능을 확장했습니다. 자세한 내용은 An introduction to Reactive Web (opens in a new tab) 을 참고하시기 바랍니다.
예를 들어서 HttpHandler 를 직접 작성해서 ReactiveHttpHandlerAdapter 에 바인딩한 후 HttpServer 에 이 HandlerAdapter 를 지정해서 서버를 직접 구동하는 코드는 아래와 같습니다.
package io.chagchagchag.example.foobar.spring_webflux;
// ...
@Slf4j
public class WebHandlerExample1_AcceptOnlyJson {
private static record TickerRecord(
String ticker
){
}
@SneakyThrows
public static void main(String[] args) {
var codecConfigurer = ServerCodecConfigurer.create();
var webHandler = new WebHandler(){
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
final ServerRequest request = ServerRequest.create(exchange, codecConfigurer.getReaders());
final ServerHttpResponse response = exchange.getResponse();
var bodyToMono = request.bodyToMono(TickerRecord.class);
return bodyToMono.flatMap(tickerRecord -> {
String tickerQuery = tickerRecord.ticker();
String ticker = tickerQuery == null ? "NVDA" : tickerQuery;
String content = "You picked " + ticker;
log.info("responseBody : {}", content);
Mono<DataBuffer> responseBody = Mono.just(
response.bufferFactory().wrap(content.getBytes())
);
response.getHeaders().add("Content-Type", "text/plain");
return response.writeWith(responseBody);
});
}
};
final HttpHandler httpHandler = WebHttpHandlerBuilder
.webHandler(webHandler)
.build();
final var adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer.create()
.host("localhost").port(8080)
.handle(adapter)
.bindNow()
.channel().closeFuture().sync();
}
}
코드를 요약해보면 이렇습니다.
WebHandler 를 생성한 후 HttpHandler에 바인딩했습니다. 그리고 생성한 HttpHandler 는 ReactiveHttpHandlerAdapter 내에 바인딩했고, 이렇게 생성한 adapter 는 HttpServer 객체 내에 handle() 메서드를 통해 HttpHandler 를 바인딩합니다.
ServerHttpRequest, ServerHttpResponse
ServerWebExchange 객체로 접근할 수 있는 요청/응답 객체인 ServerHttpRequest, ServerHttpResponse 객체에 대해서 알아봅니다.
위에서 이야기했듯 Reactor Netty 기반의 Spring 환경에서는 HttpServerRequest, HttpServerResponse 객체를 요청/응답 객체로 사용합니다. ServerHttpRequest (opens in a new tab), ServerHttpResponse (opens in a new tab) interface 는 아래와 같이 상위타입인 HttpMessage (opens in a new tab) interface 로 대체가 가능합니다.
ServerHttpRequest
ServerHttpRequest 로 접근 가능한 메서드들은 아래와 같습니다.
-
getCookies() : 클라이언트가 전달하는 read only 쿠키를 Map 으로 제공
-
getPath() : query 를 포함되지 않은 path 를 리턴
-
getQueryParams() : decoded 된 query parameter map 을 return
-
mutate() :
- default 로 선언된 메서드.
- uri, path, header 등을 변경할 수 있는 ServerHttpRequest Builder 를 제공
- ServerHttpRequest 자신을 Builder로 변경할수 있도록 제공되는 메서드
- ServerHttpRequest 에 접근할 수 있는 ServerWebExchange 타입 역시 Builder로 변경할 수 있는 메서드인 mutate() 함수를 제공합니다.
-
getBody() :
- 상위타입인 ReactiveHttpInputMessage 타입에서 제공되는 메서드입니다.
- 클라이언트가 전달하는 request body를 Flux<DataBuffer> 형태로 수신합니다. Flux 이므로 DataBuffer 가 여러번에 걸쳐서 전달됨을 유추 가능합니다.
-
getMethod()
- 상위타입인 HttpRequest 타입에서 제공하는 메서드 입니다.
- HTTP 요청 메서드를 파악할 때 사용합니다.
-
getURI()
- 상위타입인 HttpRequest 에서 제공하는 메서드 입니다.
- query param 이 모두 포함된 전체 URI 정보를 return
-
getHeaders()
- 상위타입인 HttpMessage 타입에서 제공하는 메서드입니다.
- HttpHeaders 객체에 접근하는 메서드. HttpHeaders (opens in a new tab) 클래스는 헤더 추가, 삭제 등 헤더에 관련된 유용한 메서드 들이 존재합니다.
URI
URI 객체는 아래와 같이 구성됩니다.
RequestPath
RequestPath (opens in a new tab) 를 사용하면 contextPath(), pathWithinApplication() 과 같은 메서드 들을 사용할 수 있습니다.
Spring Webflux 는 기본적으로 Root Context Path 를 "/" 으로 갖습니다. 이 값은 spring.webflux.base-path
프로퍼티를 제공해서 변경 가능합니다.
e.g.
spring.webflux.base-path=/wow
e.g.
package io.chagchagchag.example.foobar.spring_webflux;
import java.net.URI;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.RequestPath;
@Slf4j
public class URIExample1 {
@SneakyThrows
public static void main(String[] args) {
URI uri = new URI("http://localhost:8080/order/coffee?number=1#detail");
RequestPath requestPath = RequestPath.parse(uri, "/order");
log.info("requestPath.pathWithinApplication() : {}", requestPath.pathWithinApplication());
log.info("requestPath.contextPath() : {}", requestPath.contextPath());
}
}
출력결과
00:52:32.353 [main] INFO io.chagchagchag.example.foobar.spring_webflux.URIExample1 -- requestPath.pathWithinApplication() : /coffee
00:52:32.367 [main] INFO io.chagchagchag.example.foobar.spring_webflux.URIExample1 -- requestPath.contextPath() : /order
ServerHttpResponse
ServerHttpResponse 로 접근 가능한 주요 메서드 들은 아래와 같습니다.
- addCookie(ResponseCode) : Cookie 를 추가하는 데에 사용합니다.
- setStatusCode(HttpStatusCode) : status 를 지정할 때 사용합니다.
- getStatusCode() : status 를 받아올때 사용합니다.
- setComplete() : Response 의 Content 를 추가하기 전에 complete 하도록 하는 메서드 입니다.
- getHeaders() : HttpHeaders 를 return 합니다. HttpHeaders 를 사용하면 header 추가/수정/삭제를 수행가능합니다.