Reactor 패턴 소켓통신, 예제

Reactor 패턴 소켓통신

개념에 대해서는 이전 문서에서 정리를 해두었기에 이번 문서에서는 소스코드만을 정리합니다.

예제는 https://github.com/chagchagchag/example-nio-aio/tree/main/src/main/java/io/chagchagchag/example_nio_aio/reactor (opens in a new tab) 에 정리해두었습니다.

server/Reactor.java

package io.chagchagchag.example_nio_aio.reactor.server;
 
import io.chagchagchag.example_nio_aio.reactor.server.handler.Acceptor;
import io.chagchagchag.example_nio_aio.reactor.server.handler.EventHandler;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Reactor implements Runnable{
  // reactor 의 조건 : 별도의 스레드에서 동작할 수 있어야 한다. 따라서 Runnable 을 implements
 
  private int port;
  private ServerSocketChannel serverChannel;
  private Selector selector;
  private Acceptor acceptor;
 
 
  public Reactor(int port) throws Exception {
    assert port != 0;
    this.port = port;
    this.serverChannel = ServerSocketChannel.open();
 
    serverChannel.bind(new InetSocketAddress("localhost", port));
    serverChannel.configureBlocking(false);
 
    this.selector = Selector.open();
 
    this.acceptor = new Acceptor(selector, serverChannel);
    serverChannel.register(selector, SelectionKey.OP_ACCEPT).attach(acceptor);
  }
 
  private static ExecutorService executorService = Executors.newSingleThreadExecutor();
  @Override
  public void run() {
    executorService.submit(() -> {
      while(true){
        selector.select();
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
 
        while(selectedKeys.hasNext()){
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
 
          dispatch(key); // dispatcher 에 SelectionKey 를 넘겨줌
        }
      }
    });
  }
 
  public void dispatch(SelectionKey selectionKey) throws Exception{
    // register 에서 attach 했던 Handler 객체를 가져옵니다.
    EventHandler eventHandler = (EventHandler) selectionKey.attachment();
 
    // 그리고 이 handler 를 통해서 handle 을 실행
    if(selectionKey.isReadable() || selectionKey.isAcceptable()){
      eventHandler.handle();
    }
  }
}

server/handler/EventHandler.java

package io.chagchagchag.example_nio_aio.reactor.server.handler;
 
// Acceptor, Handler 에 대한 추상 기능을 제공하는 타입
// Acceptor : Accept 하는 역할을 전담
// Handler : READ 이벤트에 집중해서 처리를 전담
public interface EventHandler {
  void handle() throws Exception;
}

server/handler/Acceptor.java

package io.chagchagchag.example_nio_aio.reactor.server.handler;
 
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
 
public class Acceptor implements EventHandler {
  private final Selector selector;
  private final ServerSocketChannel serverChannel;
  public Acceptor(Selector selector, ServerSocketChannel serverChannel) throws IOException {
    assert selector != null;
    assert serverChannel != null;
 
    this.selector = selector;
    this.serverChannel = serverChannel;
  }
 
  @Override
  public void handle() throws Exception{
    SocketChannel clientChannel = serverChannel.accept();
    new TcpEventHandler(selector, clientChannel);
  }
}

server/handler/TcpEventHandler.java

package io.chagchagchag.example_nio_aio.reactor.server.handler;
 
import io.chagchagchag.example_nio_aio.reactor.server.handler.EventHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class TcpEventHandler implements EventHandler {
  private final Selector selector;
  private final SocketChannel clientChannel;
 
  public TcpEventHandler(Selector selector, SocketChannel clientChannel)
  throws Exception {
    this.selector = selector;
    this.clientChannel = clientChannel;
 
    this.clientChannel.configureBlocking(false);
    this.clientChannel.register(selector, SelectionKey.OP_READ).attach(this);
  }
 
  @Override
  public void handle() throws Exception{
    String requestBody = handleRequest(this.clientChannel);
    sendResponse(clientChannel, requestBody);
  }
 
  public static String handleRequest(SocketChannel clientSocket) throws IOException {
    ByteBuffer requestByteBuffer = ByteBuffer.allocateDirect(1024);
    clientSocket.read(requestByteBuffer);
 
    requestByteBuffer.flip();
    String requestBody = StandardCharsets.UTF_8.decode(requestByteBuffer).toString();
 
    return requestBody;
  }
 
  private static ExecutorService executorService = Executors.newFixedThreadPool(50);
 
  public static void sendResponse(SocketChannel clientSocket, String requestBody) throws IOException {
    CompletableFuture.runAsync(() -> {
      try{
        Thread.sleep(10);
 
        String content = "received : " + requestBody;
        ByteBuffer responseByteBuffer = ByteBuffer.wrap(content.getBytes());
        clientSocket.write(responseByteBuffer);
        clientSocket.close();
 
      } catch (Exception e){}
    }, executorService);
  }
}

ReactorMain.java

package io.chagchagchag.example_nio_aio.reactor;
 
import io.chagchagchag.example_nio_aio.reactor.server.Reactor;
import lombok.SneakyThrows;
 
public class ReactorMain {
  @SneakyThrows
  public static void main(String[] args) {
    System.out.println(" >>> started ");
    Reactor reactor = new Reactor(8080);
    reactor.run(); //
    System.out.println(" >>> finish ");
  }
}

클라이언트 코드 - client/JavaIOMultiClient.java

package io.chagchagchag.example_nio_aio.reactor.client;
 
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class JavaIOMultiClient {
  private static ExecutorService executorService = Executors.newFixedThreadPool(50);
  public static void main(String[] args) {
    System.out.println("start main");
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    long start = System.currentTimeMillis();
 
    for (int i = 0; i < 10; i++) {
      var future = CompletableFuture.runAsync(() -> {
        try (Socket socket = new Socket()) {
          socket.connect(new InetSocketAddress("localhost", 8080));
 
          OutputStream out = socket.getOutputStream();
          String requestBody = "This is client";
          out.write(requestBody.getBytes());
          out.flush();
 
          InputStream in = socket.getInputStream();
          byte[] responseBytes = new byte[1024];
          in.read(responseBytes);
          System.out.println("result: " + new String(responseBytes).trim());
        } catch (Exception e) {}
      }, executorService);
 
      futures.add(future);
    }
 
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    executorService.shutdown();
    System.out.println("end main");
    long end = System.currentTimeMillis();
    System.out.println("duration: " + ((end - start) / 1000.0));
  }
}

실행

  • 서버를 먼저 띄웁니다. (ReactorMain.java)

  • 클라이언트를 띄웁니다. (client/JavaIOMultiClient.java)