DispatcherHandler 와 Spring Webflux

DispatcherHandler 와 Spring Webflux

우리는 Servlet 기반의 Spring 을 개발할 때에는 보통 DispatcherServlet의 개념과 HandlerMapping 이 어떻게 매핑되고 요청이 응답되는지를 스터디해왔습니다.

Project Reactor 또는 RxJava, Mutiny 등과 같은 Reactive Manifesto 를 따르는 Reactive Streams 계열의 브러리는 Reactor Netty 환경에서 동작가능합니다.

그리고 Reactor Netty 환경에서는 DispatcherServlet 이 아닌 DispatcherHandler 를 중심으로 Request 와 Response 의 상호작용을 해결합니다.

참고자료


DispatcherHandler 의 request,response 처리 흐름

DispatcherHandler 가 Resquest, Response 를 응답하기 위해 다른 객체들과 상호작용하는 그림을 그려보면 아래와 같습니다. DispatcherServlet 에서 보던 그림과 어느 정도는 유사하기에 친숙하게 느껴집니다.


0.

  • 외부로부터 요청이 Netty 에 도착합니다.
  • Netty 는 이 요청을 DispatcherHandler 에 전달합니다.

1.

  • DispatcherHandler 는 요청의 파라미터 등을 파악해서 이 요청은 어떤 Handler 로 처리하면 맞는지 찾기 위해 HandlerMapping 을 조회합니다.
  • 그리고 찾아낸 Handler 는 HandlerAdapter 리스트를 조회해서 어떤 HandlerAdapter가 맞는지를 찾습니다.
  • 찾아낸 HandlerAdapter 를 통해 진입한 Handler 또는 Controller 에서는 Service, Repository 등의 비즈니스 로직을 수행합니다.

2.

  • DispatcherHandler 는 HandlerAdapter 가 return 한 결과 값에 대해 알맞은 HandlerResult 객체를 HandlerResult 리스트에서 조회합니다.

3.

  • DispatcherHandler 는 HandlerResult 를 종류에 맞게 적절한 처리를 할 수 있는 HandlerResultHandler 를 이용해서 사용자에게 응답을 해줍니다.

예를 들면 WebHandler 는 직접 작성한다면 아래와 같이 작성할 수 있습니다.

package io.chagchagchag.example.foobar.spring_webflux;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.ResponseCookie;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
 
@Slf4j
public class HttpHandlerExample1 {
  @SneakyThrows
  public static void main(String[] args) {
    log.info("main function started");
    var httpHandler = new HttpHandler(){
      @Override
      public Mono<Void> handle(
          ServerHttpRequest request, ServerHttpResponse response
      ) {
        String tickerQueryParam = request.getQueryParams().getFirst("ticker");
        String ticker = tickerQueryParam == null ? "MSFT" : tickerQueryParam;
 
        String content = "You picked " + ticker;
        log.info("responseBody = {}", content);
 
        Mono<DataBuffer> responseBody = Mono.just(
            response.bufferFactory().wrap(content.getBytes())
        );
 
        response.addCookie(ResponseCookie.from("ticker", ticker).build());
        response.getHeaders().add("Content-Type", "text/plain");
        return response.writeWith(responseBody);
      }
    };
 
    var adapter = new ReactorHttpHandlerAdapter(httpHandler);
    HttpServer.create()
        .host("localhost").port(8080)
        .handle(adapter)
        .bindNow()
        .channel().closeFuture().sync();
 
    log.info("main function end");
  }
}
 

HttpHandler 는 HttpHandlerAdapter 에 주입 가능합니다. 그리고 HttpServer 는 HttpHandlerAdapter 를 주입받아서 처리할 수 있도록 로직을 작성합니다.

Functional Endpoint, Annotated Controller 의 동작

Spring Webflux 에서는 함수형 엔드포인트, 선언형 컨트롤러(Annotated Controller) 가 있습니다. 두 방식 모두 잘 쓰이는 방식입니다. 개인적으로는 선언형 컨트롤러 (Annotated Controller) 를 선호합니다.

Functional Endpoint 와 Annotated Controller 는 DispatcherServlet 내에서 아래와 같이 동작합니다.

함수형 엔드포인트를 사용할 경우에는 RouterFunction 을 사용한다는 점과 선언형 컨트롤러 (Annotated Controller) 를 사용할 때에는 @RequestMapping 을 사용한다는 점을 기억하면 이해가 쉽습니다.

예를 들어 함수형 엔드포인트는 아래와 같이 Router 를 작성해서 어떤 REST API 의 어떤 METHOD 를 처리할 지를 명시하고 이 Router 를 처리할 Handler 를 정의하는 방식으로 작성합니다.

e.g.

예제 코드는 [https://github.com/chagchagchag/stock-cells-kr/tree/main/backend/stock-cells-kr-backend/src/main/java/io/stock/evaluation/web/price/api (opens in a new tab)] 에서 확인 가능합니다.


PriceApiRouter.java

package io.stock.evaluation.web.price.api;
 
// ...
 
@Configuration
public class PriceApiRouter {
 
    @Bean
    public RouterFunction<ServerResponse> stockPriceByTickerRouter(PriceApiHandler priceApiHandler){
        return RouterFunctions
                .route().GET(
                        "/stock/price",
                        RequestPredicates.queryParam("ticker", v -> true),
                        priceApiHandler::getPriceBasicValuation
                ).build();
    }
}

PriceApiHandler.java

package io.stock.evaluation.web.price.api;
 
import io.stock.evaluation.web.crawling.stock.price.application.CrawlingValuationService;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
 
import static org.springframework.web.reactive.function.server.ServerResponse.notFound;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
 
@Component
public class PriceApiHandler {
    private final CrawlingValuationService crawlingValuationService;
 
    public PriceApiHandler(CrawlingValuationService crawlingValuationService){
        this.crawlingValuationService = crawlingValuationService;
    }
 
    public Mono<ServerResponse> getPriceBasicValuation (ServerRequest serverRequest){
        return serverRequest.queryParam("ticker")
                .map(ticker -> {
                    return crawlingValuationService.getPriceBasicValuationData(ticker)
                            .flatMap(cdata -> ok()
                                        .contentType(MediaType.APPLICATION_JSON)
                                        .body(BodyInserters.fromValue(cdata))
                                        .switchIfEmpty(notFound().build())
                            );
                })
                .orElse(notFound().build());
    }
}

ServerWebExchange

ServerWebExchange 는 ServerHttpRequest, ServerHttpResponse 와 같은 요청객체, 응답객체를 접근가능하도록 하는 메서드를 제공하는 interface 입니다. 그리고 mulitpartData, formData 등을 모두 접근할 수 있는 메서드 역시 추상화 되어 있는 interface 입니다. ServerWebExchange (opens in a new tab) 자체는 interface 이며 구현체로는 DefaultServerWebExchange (opens in a new tab), MockServerWebExchange (opens in a new tab), ServerWebExchangeDecorator (opens in a new tab) 가 있습니다.

Servlet 기반의 Spring 환경에서는 HttpServletRequest, HttpServletResponse 객체를 사용했었습니다. Reactor Netty 기반의 Spring 환경에서는 HttpServerRequest, HttpServerResponse 객체를 요청/응답 객체로 사용한다는 사실을 기억해주시기 바랍니다.

일반적으로 Router 나 Controller 메서드에서 ServerHttpRequest, ServerHttpResponse 를 개별적으로 주입받아서 사용 가능하지만 ServerWebExchange 자체를 주입받아서 사용하는 경우도 있습니다.

ServerWebExchange.java

public interface ServerWebExchange {
	String LOG_ID_ATTRIBUTE = ServerWebExchange.class.getName() + ".LOG_ID";
	ServerHttpRequest getRequest();
	ServerHttpResponse getResponse();
 
	Map<String, Object> getAttributes();
 
	@SuppressWarnings("unchecked")
	@Nullable
	default <T> T getAttribute(String name) {
		return (T) getAttributes().get(name);
	}
	@SuppressWarnings("unchecked")
	default <T> T getRequiredAttribute(String name) {
		T value = getAttribute(name);
		Assert.notNull(value, () -> "Required attribute '" + name + "' is missing");
		return value;
	}
	@SuppressWarnings("unchecked")
	default <T> T getAttributeOrDefault(String name, T defaultValue) {
		return (T) getAttributes().getOrDefault(name, defaultValue);
	}
	Mono<WebSession> getSession();
	<T extends Principal> Mono<T> getPrincipal();
	Mono<MultiValueMap<String, String>> getFormData();
	Mono<MultiValueMap<String, Part>> getMultipartData();
    
	default Mono<Void> cleanupMultipart() {
		return getMultipartData()
				.onErrorComplete()  // ignore errors reading multipart data
				.flatMapIterable(Map::values)
				.flatMapIterable(Function.identity())
				.flatMap(part -> part.delete().onErrorComplete())
				.then();
	}
    
	LocaleContext getLocaleContext();
    
	@Nullable
	ApplicationContext getApplicationContext();
	boolean isNotModified();
	boolean checkNotModified(Instant lastModified);
	boolean checkNotModified(String etag);
	boolean checkNotModified(@Nullable String etag, Instant lastModified);
	String transformUrl(String url);
	void addUrlTransformer(Function<String, String> transformer);
	String getLogPrefix();
	default Builder mutate() {
		return new DefaultServerWebExchangeBuilder(this);
	}
 
	interface Builder {
		Builder request(Consumer<ServerHttpRequest.Builder> requestBuilderConsumer);
		Builder request(ServerHttpRequest request);
		Builder response(ServerHttpResponse response);
		Builder principal(Mono<Principal> principalMono);
		ServerWebExchange build();
	}
 
}

주요 메서드들을 정리해보면 아래와 같습니다.

  • getRequest(), getResponse : ServerHttpRequest, ServerHttpResponse

  • getAttributes() : 요청 중 추가/변경 가능한 key/value 형태의 Map을 접근하는 함수.

  • getSession() : Session 정보를 담고 있는 WebSessions Publisher를 반환

  • getPrincipal() : Security 정보와 관련된 Principal Publisher를 반환

  • getFormData() : Content-Type 이 application/x-www-form-urlencoded 인 데이터에 대해 MultiValueMap 의 형태의 데이터를 제공하는 Publisher 를 반환

  • getMultipartData() : Content-Type 이 multipart/form-data 인 데이터에 대해 body 를 MultiValueMap 형태로 제공

  • getApplicationContext() : Spring 환경에서 구동된 경우 applicationContext 를 반환. Spring 환경에서 구동된 것이 아닐 경우에는 null 을 리턴


WebHandler

ServerWebExchange 단위로 요청을 받으며, WebHandler 내에서 ServerWebExchanbe 객체를 통해 request, response 에 접근 가능합니다.

WebHandler 는 외부로부터 요청이 왔을 때 Reactor Netty와 Dispatcher Handler 를 거쳐서 DispatcherHandler 가 HandlerAdapter 를 찾습니다. 이후 해당 HandlerAdapter의 handle() 메서드 호출시 WebHandlerAdapter 가 적절한 WebHandler를 찾은 후 WebHandler 의 handle() 메서드를 호출하는데, 여기에서 WebHandler의 handle() 메서드를 호출하기 전에 WebFilter의 filter() 가 호출됩니다. 그리고 익셉션 등이 발생할 경우에는 WebExceptionHandler 의 handle() 메서드를 호출하게 됩니다.


HttpHandler

HttpHandler 는 WebHandler 보다 row level 에서 동작하는 Handler 입니다. 그래서 httpHandler 내에서 조금더 상위 버전인 webHandler 를 바인딩해서 기능을 확장했습니다. 자세한 내용은 An introduction to Reactive Web (opens in a new tab) 을 참고하시기 바랍니다.


예를 들어서 HttpHandler 를 직접 작성해서 ReactiveHttpHandlerAdapter 에 바인딩한 후 HttpServer 에 이 HandlerAdapter 를 지정해서 서버를 직접 구동하는 코드는 아래와 같습니다.

package io.chagchagchag.example.foobar.spring_webflux;
 
// ...
 
 
@Slf4j
public class WebHandlerExample1_AcceptOnlyJson {
  private static record TickerRecord(
      String ticker
  ){
 
  }
 
  @SneakyThrows
  public static void main(String[] args) {
    var codecConfigurer = ServerCodecConfigurer.create();
    var webHandler = new WebHandler(){
      @Override
      public Mono<Void> handle(ServerWebExchange exchange) {
        final ServerRequest request = ServerRequest.create(exchange, codecConfigurer.getReaders());
        final ServerHttpResponse response = exchange.getResponse();
 
        var bodyToMono = request.bodyToMono(TickerRecord.class);
        return bodyToMono.flatMap(tickerRecord -> {
          String tickerQuery = tickerRecord.ticker();
          String ticker = tickerQuery == null ? "NVDA" : tickerQuery;
 
          String content = "You picked " + ticker;
          log.info("responseBody : {}", content);
 
          Mono<DataBuffer> responseBody = Mono.just(
              response.bufferFactory().wrap(content.getBytes())
          );
 
          response.getHeaders().add("Content-Type", "text/plain");
          return response.writeWith(responseBody);
        });
      }
    };
 
    final HttpHandler httpHandler = WebHttpHandlerBuilder
        .webHandler(webHandler)
        .build();
 
    final var adapter = new ReactorHttpHandlerAdapter(httpHandler);
    HttpServer.create()
        .host("localhost").port(8080)
        .handle(adapter)
        .bindNow()
        .channel().closeFuture().sync();
  }
}
 

코드를 요약해보면 이렇습니다.

WebHandler 를 생성한 후 HttpHandler에 바인딩했습니다. 그리고 생성한 HttpHandler 는 ReactiveHttpHandlerAdapter 내에 바인딩했고, 이렇게 생성한 adapter 는 HttpServer 객체 내에 handle() 메서드를 통해 HttpHandler 를 바인딩합니다.


ServerHttpRequest, ServerHttpResponse

ServerWebExchange 객체로 접근할 수 있는 요청/응답 객체인 ServerHttpRequest, ServerHttpResponse 객체에 대해서 알아봅니다.

위에서 이야기했듯 Reactor Netty 기반의 Spring 환경에서는 HttpServerRequest, HttpServerResponse 객체를 요청/응답 객체로 사용합니다. ServerHttpRequest (opens in a new tab), ServerHttpResponse (opens in a new tab) interface 는 아래와 같이 상위타입인 HttpMessage (opens in a new tab) interface 로 대체가 가능합니다.


ServerHttpRequest

ServerHttpRequest 로 접근 가능한 메서드들은 아래와 같습니다.

  • getCookies() : 클라이언트가 전달하는 read only 쿠키를 Map 으로 제공

  • getPath() : query 를 포함되지 않은 path 를 리턴

  • getQueryParams() : decoded 된 query parameter map 을 return

  • mutate() :

    • default 로 선언된 메서드.
    • uri, path, header 등을 변경할 수 있는 ServerHttpRequest Builder 를 제공
    • ServerHttpRequest 자신을 Builder로 변경할수 있도록 제공되는 메서드
    • ServerHttpRequest 에 접근할 수 있는 ServerWebExchange 타입 역시 Builder로 변경할 수 있는 메서드인 mutate() 함수를 제공합니다.
  • getBody() :

    • 상위타입인 ReactiveHttpInputMessage 타입에서 제공되는 메서드입니다.
    • 클라이언트가 전달하는 request body를 Flux<DataBuffer> 형태로 수신합니다. Flux 이므로 DataBuffer 가 여러번에 걸쳐서 전달됨을 유추 가능합니다.
  • getMethod()

    • 상위타입인 HttpRequest 타입에서 제공하는 메서드 입니다.
    • HTTP 요청 메서드를 파악할 때 사용합니다.
  • getURI()

    • 상위타입인 HttpRequest 에서 제공하는 메서드 입니다.
    • query param 이 모두 포함된 전체 URI 정보를 return
  • getHeaders()

    • 상위타입인 HttpMessage 타입에서 제공하는 메서드입니다.
    • HttpHeaders 객체에 접근하는 메서드. HttpHeaders (opens in a new tab) 클래스는 헤더 추가, 삭제 등 헤더에 관련된 유용한 메서드 들이 존재합니다.

URI

URI 객체는 아래와 같이 구성됩니다.


RequestPath

RequestPath (opens in a new tab) 를 사용하면 contextPath(), pathWithinApplication() 과 같은 메서드 들을 사용할 수 있습니다.

Spring Webflux 는 기본적으로 Root Context Path 를 "/" 으로 갖습니다. 이 값은 spring.webflux.base-path 프로퍼티를 제공해서 변경 가능합니다.

e.g.

spring.webflux.base-path=/wow

e.g.

package io.chagchagchag.example.foobar.spring_webflux;
 
import java.net.URI;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.RequestPath;
 
@Slf4j
public class URIExample1 {
  @SneakyThrows
  public static void main(String[] args) {
    URI uri = new URI("http://localhost:8080/order/coffee?number=1#detail");
    RequestPath requestPath = RequestPath.parse(uri, "/order");
    log.info("requestPath.pathWithinApplication() : {}", requestPath.pathWithinApplication());
    log.info("requestPath.contextPath() : {}", requestPath.contextPath());
  }
}

출력결과

00:52:32.353 [main] INFO io.chagchagchag.example.foobar.spring_webflux.URIExample1 -- requestPath.pathWithinApplication() : /coffee
00:52:32.367 [main] INFO io.chagchagchag.example.foobar.spring_webflux.URIExample1 -- requestPath.contextPath() : /order

ServerHttpResponse

ServerHttpResponse 로 접근 가능한 주요 메서드 들은 아래와 같습니다.

  • addCookie(ResponseCode) : Cookie 를 추가하는 데에 사용합니다.
  • setStatusCode(HttpStatusCode) : status 를 지정할 때 사용합니다.
  • getStatusCode() : status 를 받아올때 사용합니다.
  • setComplete() : Response 의 Content 를 추가하기 전에 complete 하도록 하는 메서드 입니다.
  • getHeaders() : HttpHeaders 를 return 합니다. HttpHeaders 를 사용하면 header 추가/수정/삭제를 수행가능합니다.