Reactor 패턴 기반의 HTTP 서버 예제
개념에 대해서는 이전 문서에서 정리를 해두었기에 이번 문서에서는 소스코드만을 정리합니다.
이전 문서인 Reactor 패턴 기반의 소켓 프로그래밍 예제에서 달라진 점은 아래와 같습니다.
- MsgCodec 을 통해 HTTP 요청을 처리하는 부분이 추가
- Reactor 클래스 명을 EventLoop 라는 이름으로 바꾸었습니다.
예제는 https://github.com/chagchagchag/example-nio-aio/tree/main/src/main/java/io/chagchagchag/example_nio_aio/httpserver (opens in a new tab) 에 정리해두었습니다.
server/EventLoop.java
package io.chagchagchag.example_nio_aio.httpserver;
import io.chagchagchag.example_nio_aio.httpserver.server.EventLoop;
import java.util.List;
public class HttpServerMain {
public static void main(String[] args) throws Exception{
System.out.println(">>> started ");
List<EventLoop> eventLoopList = List.of(
new EventLoop(8080)
);
eventLoopList.forEach(eventLoop -> eventLoop.run());
System.out.println(">>> finished");
}
}
server/handler/EventHandler.java
package io.chagchagchag.example_nio_aio.httpserver.server.handler;
// Acceptor, Handler 에 대한 추상 기능을 제공하는 타입
// Acceptor : Accept 하는 역할을 전담
// Handler : READ 이벤트에 집중해서 처리를 전담
public interface EventHandler {
void handle() throws Exception;
}
server/handler/Acceptor.java
package io.chagchagchag.example_nio_aio.httpserver.server.handler;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Acceptor implements EventHandler{
private final Selector selector;
private final ServerSocketChannel serverChannel;
public Acceptor(Selector selector, ServerSocketChannel serverChannel) throws Exception {
assert selector != null;
assert serverChannel != null;
this.selector = selector;
this.serverChannel = serverChannel;
}
@Override
public void handle() throws Exception {
SocketChannel clientChannel = serverChannel.accept();
new HttpEventHandler(selector, clientChannel);
}
}
server/handler/HttpEventHandler.java
package io.chagchagchag.example_nio_aio.httpserver.server.handler;
import io.chagchagchag.example_nio_aio.httpserver.codec.MsgCodec;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class HttpEventHandler implements EventHandler {
private final Selector selector;
private final SocketChannel clientChannel;
private final MsgCodec msgCodec;
public HttpEventHandler(Selector selector, SocketChannel clientChannel)
throws Exception {
this.selector = selector;
this.clientChannel = clientChannel;
this.clientChannel.configureBlocking(false);
this.clientChannel.register(selector, SelectionKey.OP_READ).attach(this);
this.msgCodec = new MsgCodec();
}
@Override
public void handle() throws Exception {
try{
String requestBody = handleRequest(this.clientChannel);
System.out.println("requestBody :: " + requestBody);
sendResponse(clientChannel, requestBody);
}
catch (Exception e){
e.printStackTrace();
System.out.println("message = " + e.getMessage());
throw new RuntimeException(e.getMessage());
}
}
public String handleRequest(SocketChannel clientSocket) throws IOException {
ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
clientSocket.read(requestByteBuffer);
return msgCodec.decode(requestByteBuffer);
}
private static ExecutorService executorService = Executors.newFixedThreadPool(50);
public void sendResponse(SocketChannel clientSocket, String requestBody) throws IOException {
CompletableFuture.runAsync(() -> {
try{
Thread.sleep(10);
ByteBuffer responseByteBuffer = msgCodec.encode(requestBody);
clientSocket.write(responseByteBuffer);
clientSocket.close();
} catch (Exception e){}
}, executorService);
}
}
codec/MsgCodec.java
package io.chagchagchag.example_nio_aio.httpserver.codec;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.stream.Collectors;
public class MsgCodec {
public ByteBuffer encode(final String msg){
var body = "<html><body><h1>OK, " + msg + "!</h1></body></html>";
var contentLength = body.getBytes().length;
var httpResponse = "HTTP/1.1 200 OK\n" +
"Content-Type: text/html; charset=utf-8\n" +
"Content-Length: " + contentLength + "\n\n" + body;
return StandardCharsets.UTF_8.encode(httpResponse);
}
public String decode(final ByteBuffer buffer){
buffer.flip();
var httpRequest = StandardCharsets.UTF_8.decode(buffer).toString().trim();
var firstLine = httpRequest.split("\n")[0];
var path = firstLine.split(" ")[1];
URI uri = URI.create(path);
var query = uri.getQuery() == null ? "" : uri.getQuery();
var queryMap = Arrays.stream(query.split("&"))
.map(s -> s.split("="))
.filter(s -> s.length == 2)
.collect(Collectors.toMap(s -> s[0], s->s[1]));
return queryMap.getOrDefault("name", "Anonymous");
}
}
클라이언트 - client/JavaIOMultiClient.java
package io.chagchagchag.example_nio_aio.httpserver.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JavaIOMultiClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(50);
public static void main(String[] args) {
System.out.println("start main");
List<CompletableFuture<Void>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
var future = CompletableFuture.runAsync(() -> {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress("localhost", 8080));
OutputStream out = socket.getOutputStream();
String requestBody = "This is client";
out.write(requestBody.getBytes());
out.flush();
InputStream in = socket.getInputStream();
byte[] responseBytes = new byte[1024];
in.read(responseBytes);
System.out.println("result: " + new String(responseBytes).trim());
} catch (Exception e) {}
}, executorService);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executorService.shutdown();
System.out.println("end main");
long end = System.currentTimeMillis();
System.out.println("duration: " + ((end - start) / 1000.0));
}
}
HttpServerMain.java
package io.chagchagchag.example_nio_aio.httpserver;
import io.chagchagchag.example_nio_aio.httpserver.server.EventLoop;
import java.util.List;
public class HttpServerMain {
public static void main(String[] args) throws Exception{
System.out.println(">>> started ");
List<EventLoop> eventLoopList = List.of(
new EventLoop(8080)
);
eventLoopList.forEach(eventLoop -> eventLoop.run());
System.out.println(">>> finished");
}
}
실행
-
서버를 먼저 띄웁니다. (
HttpServerMain.java
) -
클라이언트를 띄웁니다. (
client/JavaIOMultiClient.java
)