Java NIO 소켓통신

Java IO Socket 통신

Java IO 버전의 소켓 통신의 절차는 아래와 같습니다.


Java NIO 셀렉터/채널 기반 소켓 프로그래밍

JAVA NIO 의 셀렉터/채널 기반의 소켓 프로그래밍은 하나의 스레드가 여러 채널을 처리(handle)할 수 있는 구조입니다.

각각의 channel 들은 Selector 에 등록되며, 하나의 스레드가 Selector 를 관리합니다. 이렇게 해서 스레드 하나가 Selector 를 통해 여러 Channel 을 처리할 수 있는 멀티플렉싱 방식으로 동작합니다.

구체적인 통신 과정은 다음과 같습니다.

channel 의 register() 메서드는 selector 객체를 전달받아서 자기자신을 Selector 를 등록합니다. 이 방식으로 하나 이상의 채널들을 셀렉터에 등록합니다.

만약 selector.register(...) 방식이었다면, 동시성 문제가 있었겠지만, selector 객체를 channel 에 넘겨준 방식은 기발한 방식이라고 느꼈던 것 같습니다.

이후 select() 메서드를 통해 호출할 때 등록된 채널 들 중 이벤트 준비가 완료된 하나 이상 발생할 때 까지 block 됩니다. select() 메서드 내에서 하나 이상의 채널의 이벤트가 발생하면 select() 메서드 다음으로 넘어갑니다.

그리고 selector.selectedKeys() 를 통해서 준비가 완료된 채널들의 목록을 통해서 채널들의 목록을 반환하며, 이 채널들의 이벤트 성격(Acceptable, Readable 등)에 따라 각각의 처리를 합니다.

Channel(채널), Buffer(버퍼)


Channel 은 Stream 과 유사한 개념이며 Stream 과 비교해서 아래의 차이점들이 있습니다.

여기서 이야기하는 Stream 은 Java 8 의 Stream 이 아닌 Java I/O 에서의 InputStream, OutputStream 과 같은 IO 연산 관련 Stream 을 의미합니다.

  • Channel 은 Read, Write 모두 가능하지만, Stream 은 단방향(Read 만 가능하거나 Write만 가능)으로만 가능합니다.
  • Channel 은 비동기(Asynchronous)적으로 읽고 쓰는 것이 가능합니다.
  • Channel 은 항상 Buffer 로부터 읽고 쓰는 작업을 수행합니다.

Channel 에는 대표적으로 FileChannel, DatagramChannel, SocketChannel, ServerSocketChannel 이 있습니다.

  • FileChannel : 파일에 데이터를 Read/Write 합니다.
  • DatagramChannel : UDP 통신을 통해 네트워크 데이터를 Read/Write 합니다.
  • SocketChannel : TCP 통신을 통해 네트워크 데이터를 Read/Write 합니다.
  • ServerSocketChannel : TCP Listening(수신작업)을 수행합니다. 유입되는 연결마다 SocketChannel 이 만들어집니다.

사용자가 채널에 데이터를 읽거나 쓰기위해서는 Buffer 를 통해서 읽고 씁니다. 데이터를 Buffer 를 통해 읽거나, Buffer 를 통해서 씁니다. Buffer 에는 아래와 같이 여러 종류의 Buffer 가 있습니다.

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

지금부터는 Buffer 라는 용어보다는 버퍼 라는 단어로 사용하겠습니다. (영타 전환이 불편해서요)

버퍼에 데이터를 쓸 때 버퍼는 지금까지 쓴 데이터의 양을 기록합니다. 데이터를 읽어들여야 할 때에는 flip() 메서드를 호출해서 버퍼를 쓰기 모드에서 읽기 모드로 전환합니다.

데이터를 모두 읽은 후에는 버퍼를 지우고 다시 쓸 준비를 해야하는데 이때 clear(), compact() 메서드 중 하나를 호출해서 전체 버퍼를 지웁니다. clear() 메서드는 버퍼 전체를 지우며, compact() 메서드는 이미 읽은 데이터만 지웁니다.

NIO 소켓 프로그래밍 API

NIO 소켓 프로그래밍의 일반적인 API 와 통신 절차 등을 간략하게 정리해봅니다.


ServerSocketChannel 객체 생성, 소켓 개방

먼저 ServerSocketChannel 을 여는 과정입니다. ServerSocketChannel 객체의 static 메서드인 open() 메서드를 이용해서 Channel 을 열고 이 Channel 에 listen 할 주소를 bind 합니다. 그리고 마지막으로 channel 에 대해 configureBlocking(false) 를 지정해서 채널의 연산 동작은 Non Blocking 방식으로 동작하도록 지정해줬습니다.

ServerSocketChannel channel = ServerSocketChannel.open();
 
channel.bind(new InetSocketAddress("localhost", 8080));
channel.configureBlocking(false); // Non Blocking 방식으로 Channel 의 이벤트를 읽어들이도록 지정 

Selector 객체 생성

Selector selector = Selector.open();

Selector 에 Channel 등록

// Selector 에 channel 을 등록
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_ACCEPT);

Selector 를 이용해서 채널 선택

selector.select();

참고

  • select()
    • 등록한 이벤트에 대해 하나 이상의 채널이 준비될 때 까지 blocking
    • 몇개의 채널이 준비되었는지 준비된 채널의 수를 return
    • 이 채널의 수는 마지막으로 select() 를 호출한 이후 준비된 채널 수를 의미
  • select(long timeout)
    • 최대 몇 ms 동안 timeout 을 적용할지를 지정한 select() 메서드
  • selectNow()
    • 채널이 준비될 때 까지 blocking 하지 않고 이미 준비되어 있는 채널이 있으면 그것을 즉시 return

Channel 에 이벤트가 발생할 때 까지 대기하고 있는 것을 의미합니다. 위에서는 channel.configureBlocking(false) 로 지정했기 때문에 Non Blocking 방식으로 대기하게 됩니다. 마치 위의 코드는 블로킹 코드 처럼 보이지만 채널이 준비되기를 기다리는 동안 내부적으로는 다른 일을 처리하고 있습니다.

e.g.

try{
    try(
          ServerSocketChannel serverChannel = ServerSocketChannel.open();
          Selector selector = Selector.open();
      ){
        serverChannel.bind(new InetSocketAddress("localhost", 8080));
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while(true){
          selector.select();
          // ...
        }
    }
}
catch(...){}
finally{ executorService.shutdown(); }

selector.selectedKeys() 를 통해 이벤트가 발생한 채널들의 목록을 Set 으로 획득

이벤트가 발생한 채널들을 Set<SelectionKey> 으로 획득합니다.

Set<SelectionKey> selectedKeys = selector.selectedKeys();

참고로 Selector.selectedKeys() 가 반환하는 Set<SelectionKey> 는 Thread Safe 하지 않기 때문에 Concurrent*** 계열의 자료구조에 복사해서 불변성 처리를 하는 것도 좋은 방법입니다.


각각의 SelectionKey 에 대해 이벤트 처리 작업 수행

이벤트가 발생한 각각의 채널들에 대해 이벤트 처리를 수행합니다. 준비된 채널들을 획득하기 위해 획득한 Set<SelectionKey> 는 이벤트 처리가 완료되면 처리가 완료된 채널은 remove() 를 통해 삭제해줍니다.

try{
    try(
          ServerSocketChannel serverChannel = ServerSocketChannel.open();
          Selector selector = Selector.open();
      ){
        // ...
        
        while(true){
          selector.select();
          
          // 준비된 채널들을 Set 형태로 변환후 Iterator 형식으로 변환
          Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
 
          // 채널들 순회하면서 이벤트 처리 
          while(selectedKeys.hasNext()){
            SelectionKey key = selectedKeys.next();
            // selectedKeys.remove(); // 여기에서 remove() 해도 상관 없긴 합니다.
 
            if(key.isAcceptable()){
              SocketChannel clientSocket = ((ServerSocketChannel)key.channel()).accept();
              clientSocket.configureBlocking(false);
              clientSocket.register(selector, SelectionKey.OP_READ);
            }
            else if(key.isReadable()){
              SocketChannel clientSocket = (SocketChannel) key.channel();
              String requestBody = handleRequest(clientSocket);
              sendResponse(clientSocket, requestBody);
            }
            
            selectedKeys.remove();
          }
        }
    }
}
catch(...){}
finally{ executorService.shutdown(); }

참고) SelectionKey

SelectionKey 상수

SelectionKey 내에 정의된 상수들은 아래와 같습니다.

SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE

위의 이벤트 상수들은 아래와 같이 조합해서 사용하는 것이 가능합니다.

int events = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

interestOps

// Selector 에 channel 을 등록
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_ACCEPT);

SelectionKey 에는 interestOps() 라는 메서드가 있으며 셀렉터에 등록된 채널이 확인하려고 하는 이벤트들의 조합을 int 정수형으로 돌려받을 수 있습니다. 예를 들면 아래와 같은 형식입니다.

int interestSet = selectionKey.interestOps();
 
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

readyOps

readyOps() 는 셀렉터에 등록된 채널에서 준비되어 처리 가능한 이벤트들의 집합을 찾을 때 사용하는 메서드입니다.

int readySet = SelectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

SelectionKey 로 채널, 셀렉터에 접근 가능

Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector(); 

SelectionKey 에 객체를 첨부하는 것 역시 가능

채널에 추가 정보를 첨부하려고 하거나, 채널에서 사용하는 버퍼 객체 등을 첨부하려 할 때 사용 가능한 방법입니다.

selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();

셀렉터에 채널을 등록할때 객체를 첨부하는 것 역시 가능

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

위의 코드는 아래와 같이 변경하는 것 역시 가능합니다.

SelectionKey key = channel.register(selector, SelectionKey.OP_READ).attach(acceptor);

e.g.1 : 단순한 버전의 Client, Server

예제코드는 https://github.com/chagchagchag/webflux-mongo-mysql-redis/tree/main/demo-nio/src/main/java/io/chagchagchag/example/foobar/nio/socket (opens in a new tab)Example1_ 로 시작하는 파일명 들입니다.


Client

package io.chagchagchag.example.foobar.nio.socket;
 
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Example1_Client_SocketChannel {
  @SneakyThrows
  public static void main(String [] args){
    log.info("main function start");
    try(var socketChannel = SocketChannel.open()){
      var address = new InetSocketAddress("localhost", 8080);
      var connected = socketChannel.connect(address);
      log.info("connected : {}", connected);
 
      String message = "안녕하세요. 클라이언트에요.";
      ByteBuffer requestMessageBuffer = ByteBuffer.wrap(message.getBytes());
      socketChannel.write(requestMessageBuffer);
      requestMessageBuffer.clear();
 
      ByteBuffer result = ByteBuffer.allocateDirect(1024);
      while(socketChannel.read(result) > 0){
        result.flip();
        log.info("서버 응답 (Response) = {}", StandardCharsets.UTF_8.decode(result));
        result.clear();
      }
    }
    log.info("main function end");
  }
}

Server

Channel 을 통해서 소켓 커넥션을 열고 Channel 로 accept() 를 합니다. 소켓통신을 배워본적이 있다면 accept()는 클라이언트의 응답을 기다리는 동작이라는 것을 알고 계실 겁니다. Java IO 에서의 ServerSocket 의 accept() 는 블로킹이 발생하는 작업이지만, NIO는 accept() 를 논블로킹 방식으로 수행합니다.

package io.chagchagchag.example.foobar.nio.socket;
 
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.StandardCharsets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Example1_Server_ServerSocketChannel {
  @SneakyThrows
  public static void main (String [] args){
    log.info("main function started");
    try(var serverChannel = ServerSocketChannel.open()){
      var address = new InetSocketAddress("localhost", 8080);
      serverChannel.bind(address);
 
      try(var clientSocket = serverChannel.accept()){
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
        clientSocket.read(buffer);
        buffer.flip();
 
        var decodeMessage = StandardCharsets.UTF_8.decode(buffer);
        var clientMessage = String.valueOf(decodeMessage);
        log.info("클라이언트로부터 온 메시지 = {}", clientMessage);
 
        var responseMessage = "안녕하세요. 저는 서버입니다.";
        var responseBuffer = ByteBuffer.wrap(responseMessage.getBytes());
        clientSocket.write(responseBuffer);
        responseBuffer.flip();
      }
    }
    log.info("main function end");
  }
}

출력결과

Client

10:37:59.175 [main] INFO io.chagchagchag.example.foobar.nio.socket.Example1_Client_SocketChannel -- main function start
10:37:59.196 [main] INFO io.chagchagchag.example.foobar.nio.socket.Example1_Client_SocketChannel -- connected : true
10:38:17.826 [main] INFO io.chagchagchag.example.foobar.nio.socket.Example1_Client_SocketChannel -- 서버 응답 (Response) = 안녕하세요. 저는 서버입니다.
10:38:17.827 [main] INFO io.chagchagchag.example.foobar.nio.socket.Example1_Client_SocketChannel -- main function end

Process finished with exit code 0

Server

10:36:40.736 [main] INFO io.chagchagchag.example.foobar.nio.socket.Example1_Server_ServerSocketChannel -- main function started

Process finished with exit code 130

e.g. 2 :