Reactor 패턴 소켓통신
개념에 대해서는 이전 문서에서 정리를 해두었기에 이번 문서에서는 소스코드만을 정리합니다.
예제는 https://github.com/chagchagchag/example-nio-aio/tree/main/src/main/java/io/chagchagchag/example_nio_aio/reactor (opens in a new tab) 에 정리해두었습니다.
server/Reactor.java
package io.chagchagchag.example_nio_aio.reactor.server;
import io.chagchagchag.example_nio_aio.reactor.server.handler.Acceptor;
import io.chagchagchag.example_nio_aio.reactor.server.handler.EventHandler;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Reactor implements Runnable{
// reactor 의 조건 : 별도의 스레드에서 동작할 수 있어야 한다. 따라서 Runnable 을 implements
private int port;
private ServerSocketChannel serverChannel;
private Selector selector;
private Acceptor acceptor;
public Reactor(int port) throws Exception {
assert port != 0;
this.port = port;
this.serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress("localhost", port));
serverChannel.configureBlocking(false);
this.selector = Selector.open();
this.acceptor = new Acceptor(selector, serverChannel);
serverChannel.register(selector, SelectionKey.OP_ACCEPT).attach(acceptor);
}
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
@Override
public void run() {
executorService.submit(() -> {
while(true){
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while(selectedKeys.hasNext()){
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
dispatch(key); // dispatcher 에 SelectionKey 를 넘겨줌
}
}
});
}
public void dispatch(SelectionKey selectionKey) throws Exception{
// register 에서 attach 했던 Handler 객체를 가져옵니다.
EventHandler eventHandler = (EventHandler) selectionKey.attachment();
// 그리고 이 handler 를 통해서 handle 을 실행
if(selectionKey.isReadable() || selectionKey.isAcceptable()){
eventHandler.handle();
}
}
}
server/handler/EventHandler.java
package io.chagchagchag.example_nio_aio.reactor.server.handler;
// Acceptor, Handler 에 대한 추상 기능을 제공하는 타입
// Acceptor : Accept 하는 역할을 전담
// Handler : READ 이벤트에 집중해서 처리를 전담
public interface EventHandler {
void handle() throws Exception;
}
server/handler/Acceptor.java
package io.chagchagchag.example_nio_aio.reactor.server.handler;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Acceptor implements EventHandler {
private final Selector selector;
private final ServerSocketChannel serverChannel;
public Acceptor(Selector selector, ServerSocketChannel serverChannel) throws IOException {
assert selector != null;
assert serverChannel != null;
this.selector = selector;
this.serverChannel = serverChannel;
}
@Override
public void handle() throws Exception{
SocketChannel clientChannel = serverChannel.accept();
new TcpEventHandler(selector, clientChannel);
}
}
server/handler/TcpEventHandler.java
package io.chagchagchag.example_nio_aio.reactor.server.handler;
import io.chagchagchag.example_nio_aio.reactor.server.handler.EventHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TcpEventHandler implements EventHandler {
private final Selector selector;
private final SocketChannel clientChannel;
public TcpEventHandler(Selector selector, SocketChannel clientChannel)
throws Exception {
this.selector = selector;
this.clientChannel = clientChannel;
this.clientChannel.configureBlocking(false);
this.clientChannel.register(selector, SelectionKey.OP_READ).attach(this);
}
@Override
public void handle() throws Exception{
String requestBody = handleRequest(this.clientChannel);
sendResponse(clientChannel, requestBody);
}
public static String handleRequest(SocketChannel clientSocket) throws IOException {
ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
clientSocket.read(requestByteBuffer);
requestByteBuffer.flip();
String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
return requestBody;
}
private static ExecutorService executorService = Executors.newFixedThreadPool(50);
public static void sendResponse(SocketChannel clientSocket, String requestBody) throws IOException {
CompletableFuture.runAsync(() -> {
try{
Thread.sleep(10);
String content = "received : " + requestBody;
ByteBuffer responseByteBuffer = ByteBuffer.wrap(content.getBytes());
clientSocket.write(responseByteBuffer);
clientSocket.close();
} catch (Exception e){}
}, executorService);
}
}
ReactorMain.java
package io.chagchagchag.example_nio_aio.reactor;
import io.chagchagchag.example_nio_aio.reactor.server.Reactor;
import lombok.SneakyThrows;
public class ReactorMain {
@SneakyThrows
public static void main(String[] args) {
System.out.println(" >>> started ");
Reactor reactor = new Reactor(8080);
reactor.run(); //
System.out.println(" >>> finish ");
}
}
클라이언트 코드 - client/JavaIOMultiClient.java
package io.chagchagchag.example_nio_aio.reactor.client;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JavaIOMultiClient {
private static ExecutorService executorService = Executors.newFixedThreadPool(50);
public static void main(String[] args) {
System.out.println("start main");
List<CompletableFuture<Void>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
var future = CompletableFuture.runAsync(() -> {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress("localhost", 8080));
OutputStream out = socket.getOutputStream();
String requestBody = "This is client";
out.write(requestBody.getBytes());
out.flush();
InputStream in = socket.getInputStream();
byte[] responseBytes = new byte[1024];
in.read(responseBytes);
System.out.println("result: " + new String(responseBytes).trim());
} catch (Exception e) {}
}, executorService);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executorService.shutdown();
System.out.println("end main");
long end = System.currentTimeMillis();
System.out.println("duration: " + ((end - start) / 1000.0));
}
}
실행
-
서버를 먼저 띄웁니다. (
ReactorMain.java
) -
클라이언트를 띄웁니다. (
client/JavaIOMultiClient.java
)