Reactor 패턴 기반의 HTTP 서버 예제

Reactor 패턴 기반의 HTTP 서버 예제

개념에 대해서는 이전 문서에서 정리를 해두었기에 이번 문서에서는 소스코드만을 정리합니다.

이전 문서인 Reactor 패턴 기반의 소켓 프로그래밍 예제에서 달라진 점은 아래와 같습니다.

  • MsgCodec 을 통해 HTTP 요청을 처리하는 부분이 추가
  • Reactor 클래스 명을 EventLoop 라는 이름으로 바꾸었습니다.

예제는 https://github.com/chagchagchag/example-nio-aio/tree/main/src/main/java/io/chagchagchag/example_nio_aio/httpserver (opens in a new tab) 에 정리해두었습니다.


server/EventLoop.java

package io.chagchagchag.example_nio_aio.httpserver;
 
import io.chagchagchag.example_nio_aio.httpserver.server.EventLoop;
import java.util.List;
 
public class HttpServerMain {
  public static void main(String[] args) throws Exception{
    System.out.println(">>> started ");
 
    List<EventLoop> eventLoopList = List.of(
        new EventLoop(8080)
    );
 
    eventLoopList.forEach(eventLoop -> eventLoop.run());
 
    System.out.println(">>> finished");
  }
}

server/handler/EventHandler.java

package io.chagchagchag.example_nio_aio.httpserver.server.handler;
 
// Acceptor, Handler 에 대한 추상 기능을 제공하는 타입
// Acceptor : Accept 하는 역할을 전담
// Handler : READ 이벤트에 집중해서 처리를 전담
public interface EventHandler {
  void handle() throws Exception;
}

server/handler/Acceptor.java

package io.chagchagchag.example_nio_aio.httpserver.server.handler;
 
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
 
public class Acceptor implements EventHandler{
  private final Selector selector;
  private final ServerSocketChannel serverChannel;
 
  public Acceptor(Selector selector, ServerSocketChannel serverChannel) throws Exception {
    assert selector != null;
    assert serverChannel != null;
 
    this.selector = selector;
    this.serverChannel = serverChannel;
  }
 
  @Override
  public void handle() throws Exception {
    SocketChannel clientChannel = serverChannel.accept();
    new HttpEventHandler(selector, clientChannel);
  }
}

server/handler/HttpEventHandler.java

package io.chagchagchag.example_nio_aio.httpserver.server.handler;
 
import io.chagchagchag.example_nio_aio.httpserver.codec.MsgCodec;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class HttpEventHandler implements EventHandler {
  private final Selector selector;
  private final SocketChannel clientChannel;
  private final MsgCodec msgCodec;
 
  public HttpEventHandler(Selector selector, SocketChannel clientChannel)
  throws Exception {
    this.selector = selector;
    this.clientChannel = clientChannel;
 
    this.clientChannel.configureBlocking(false);
    this.clientChannel.register(selector, SelectionKey.OP_READ).attach(this);
    this.msgCodec = new MsgCodec();
  }
 
  @Override
  public void handle() throws Exception {
    try{
      String requestBody = handleRequest(this.clientChannel);
      System.out.println("requestBody :: " + requestBody);
      sendResponse(clientChannel, requestBody);
    }
    catch (Exception e){
      e.printStackTrace();
      System.out.println("message = " + e.getMessage());
      throw new RuntimeException(e.getMessage());
    }
  }
 
  public String handleRequest(SocketChannel clientSocket) throws IOException {
    ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
    clientSocket.read(requestByteBuffer);
    return msgCodec.decode(requestByteBuffer);
  }
 
  private static ExecutorService executorService = Executors.newFixedThreadPool(50);
 
  public void sendResponse(SocketChannel clientSocket, String requestBody) throws IOException {
    CompletableFuture.runAsync(() -> {
      try{
        Thread.sleep(10);
 
        ByteBuffer responseByteBuffer = msgCodec.encode(requestBody);
        clientSocket.write(responseByteBuffer);
        clientSocket.close();
 
      } catch (Exception e){}
    }, executorService);
  }
}

codec/MsgCodec.java

package io.chagchagchag.example_nio_aio.httpserver.codec;
 
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.stream.Collectors;
 
public class MsgCodec {
  public ByteBuffer encode(final String msg){
    var body = "<html><body><h1>OK, " + msg + "!</h1></body></html>";
    var contentLength = body.getBytes().length;
 
    var httpResponse = "HTTP/1.1 200 OK\n" +
        "Content-Type: text/html; charset=utf-8\n" +
        "Content-Length: " + contentLength + "\n\n" + body;
 
    return StandardCharsets.UTF_8.encode(httpResponse);
  }
 
  public String decode(final ByteBuffer buffer){
    buffer.flip();
    var httpRequest = StandardCharsets.UTF_8.decode(buffer).toString().trim();
    var firstLine = httpRequest.split("\n")[0];
    var path = firstLine.split(" ")[1];
    URI uri = URI.create(path);
 
    var query = uri.getQuery() == null ? "" : uri.getQuery();
 
    var queryMap = Arrays.stream(query.split("&"))
        .map(s -> s.split("="))
        .filter(s -> s.length == 2)
        .collect(Collectors.toMap(s -> s[0], s->s[1]));
 
    return queryMap.getOrDefault("name", "Anonymous");
  }
}

클라이언트 - client/JavaIOMultiClient.java

package io.chagchagchag.example_nio_aio.httpserver.client;
 
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class JavaIOMultiClient {
  private static ExecutorService executorService = Executors.newFixedThreadPool(50);
  public static void main(String[] args) {
    System.out.println("start main");
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    long start = System.currentTimeMillis();
 
    for (int i = 0; i < 10; i++) {
      var future = CompletableFuture.runAsync(() -> {
        try (Socket socket = new Socket()) {
          socket.connect(new InetSocketAddress("localhost", 8080));
 
          OutputStream out = socket.getOutputStream();
          String requestBody = "This is client";
          out.write(requestBody.getBytes());
          out.flush();
 
          InputStream in = socket.getInputStream();
          byte[] responseBytes = new byte[1024];
          in.read(responseBytes);
          System.out.println("result: " + new String(responseBytes).trim());
        } catch (Exception e) {}
      }, executorService);
 
      futures.add(future);
    }
 
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    executorService.shutdown();
    System.out.println("end main");
    long end = System.currentTimeMillis();
    System.out.println("duration: " + ((end - start) / 1000.0));
  }
}

HttpServerMain.java

package io.chagchagchag.example_nio_aio.httpserver;
 
import io.chagchagchag.example_nio_aio.httpserver.server.EventLoop;
import java.util.List;
 
public class HttpServerMain {
  public static void main(String[] args) throws Exception{
    System.out.println(">>> started ");
 
    List<EventLoop> eventLoopList = List.of(
        new EventLoop(8080)
    );
 
    eventLoopList.forEach(eventLoop -> eventLoop.run());
 
    System.out.println(">>> finished");
  }
}

실행

  • 서버를 먼저 띄웁니다. (HttpServerMain.java)

  • 클라이언트를 띄웁니다. (client/JavaIOMultiClient.java)